diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 1088 |
1 files changed, 621 insertions, 467 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index f537fb09c2..63ac65ac78 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -95,77 +95,105 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, "chttp2_refcount"); /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin_locked(void* t, grpc_error* error); -static void write_action(void* t, grpc_error* error); -static void write_action_end_locked(void* t, grpc_error* error); +static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); +static void write_action(grpc_exec_ctx* exec_ctx, void* t, grpc_error* error); +static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); -static void read_action_locked(void* t, grpc_error* error); +static void read_action_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); -static void complete_fetch_locked(void* gs, grpc_error* error); +static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs, + grpc_error* error); /** Set a transport level setting, and push it to our peer */ -static void queue_setting_update(grpc_chttp2_transport* t, +static void queue_setting_update(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value); -static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, - grpc_error* error); +static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error); /** Start new streams that have been created if we can */ -static void maybe_start_some_streams(grpc_chttp2_transport* t); +static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); -static void connectivity_state_set(grpc_chttp2_transport* t, +static void connectivity_state_set(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_connectivity_state state, grpc_error* error, const char* reason); -static void incoming_byte_stream_destroy_locked(void* byte_stream, +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, grpc_error* error_ignored); static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs); - -static void benign_reclaimer_locked(void* t, grpc_error* error); -static void destructive_reclaimer_locked(void* t, grpc_error* error); - -static void post_benign_reclaimer(grpc_chttp2_transport* t); -static void post_destructive_reclaimer(grpc_chttp2_transport* t); - -static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error); -static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error); - -static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); -static void start_bdp_ping_locked(void* tp, grpc_error* error); -static void finish_bdp_ping_locked(void* tp, grpc_error* error); -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error); - -static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); -static void send_ping_locked(grpc_chttp2_transport* t, + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error); +static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx, + grpc_chttp2_incoming_byte_stream* bs); + +static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); +static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t, + grpc_error* error); + +static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); + +static void close_transport_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_error* error); +static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error); + +static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t); +static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); +static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx, + void* tp, grpc_error* error); + +static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error); +static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_complete); -static void retry_initiate_ping_locked(void* tp, grpc_error* error); +static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error); /** keepalive-relevant functions */ -static void init_keepalive_ping_locked(void* arg, grpc_error* error); -static void start_keepalive_ping_locked(void* arg, grpc_error* error); -static void finish_keepalive_ping_locked(void* arg, grpc_error* error); -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); - -static void reset_byte_stream(void* arg, grpc_error* error); +static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); + +static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void destruct_transport(grpc_chttp2_transport* t) { +static void destruct_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { size_t i; - grpc_endpoint_destroy(t->ep); + grpc_endpoint_destroy(exec_ctx, t->ep); - grpc_slice_buffer_destroy_internal(&t->qbuf); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf); - grpc_slice_buffer_destroy_internal(&t->outbuf); - grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf); + grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor); - grpc_slice_buffer_destroy_internal(&t->read_buffer); - grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); + grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer); + grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { @@ -178,11 +206,12 @@ static void destruct_transport(grpc_chttp2_transport* t) { GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); grpc_chttp2_stream_map_destroy(&t->stream_map); - grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); + grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport"); + GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); - cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); + cancel_pings(exec_ctx, t, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); while (t->write_cb_pool) { grpc_chttp2_write_cb* next = t->write_cb_pool->next; @@ -199,7 +228,8 @@ static void destruct_transport(grpc_chttp2_transport* t) { } #ifndef NDEBUG -void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason, +void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, const char* reason, const char* file, int line) { if (grpc_trace_chttp2_refcount.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count); @@ -207,7 +237,7 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason, t, val, val - 1, reason, file, line); } if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + destruct_transport(exec_ctx, t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, @@ -220,9 +250,10 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, gpr_ref(&t->refs); } #else -void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) { +void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!gpr_unref(&t->refs)) return; - destruct_transport(t); + destruct_transport(exec_ctx, t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } @@ -230,7 +261,7 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_chttp2_transport* t, +static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { size_t i; @@ -289,7 +320,7 @@ static void init_transport(grpc_chttp2_transport* t, t->goaway_error = GRPC_ERROR_NONE; grpc_chttp2_goaway_parser_init(&t->goaway_parser); - grpc_chttp2_hpack_parser_init(&t->hpack_parser); + grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); grpc_slice_buffer_init(&t->read_buffer); @@ -320,13 +351,14 @@ static void init_transport(grpc_chttp2_transport* t, /* configure http2 the way we like it */ if (is_client) { - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - 1); + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; t->ping_policy.min_sent_ping_interval_without_data = @@ -501,7 +533,7 @@ static void init_transport(grpc_chttp2_transport* t, int value = grpc_channel_arg_get_integer( &channel_args->args[i], settings_map[j].integer_options); if (value >= 0) { - queue_setting_update(t, settings_map[j].setting_id, + queue_setting_update(exec_ctx, t, settings_map[j].setting_id, (uint32_t)value); } } @@ -512,7 +544,7 @@ static void init_transport(grpc_chttp2_transport* t, } } - t->flow_control.Init(t, enable_bdp); + t->flow_control.Init(exec_ctx, t, enable_bdp); /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; @@ -526,8 +558,8 @@ static void init_transport(grpc_chttp2_transport* t, if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -537,37 +569,42 @@ static void init_transport(grpc_chttp2_transport* t, if (enable_bdp) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(t); + schedule_bdp_ping_locked(exec_ctx, t); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, - nullptr); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr); } - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); - post_benign_reclaimer(t); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); + post_benign_reclaimer(exec_ctx, t); } -static void destroy_transport_locked(void* tp, grpc_error* error) { +static void destroy_transport_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; t->destroying = 1; close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); + exec_ctx, t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); } -static void destroy_transport(grpc_transport* gt) { +static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(destroy_transport_locked, t, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); } -static void close_transport_locked(grpc_chttp2_transport* t, +static void close_transport_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_error* error) { - end_all_the_calls(t, GRPC_ERROR_REF(error)); - cancel_pings(t, GRPC_ERROR_REF(error)); + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); if (t->closed_with_error == GRPC_ERROR_NONE) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, @@ -585,21 +622,21 @@ static void close_transport_locked(grpc_chttp2_transport* t, } GPR_ASSERT(error != GRPC_ERROR_NONE); t->closed_with_error = GRPC_ERROR_REF(error); - connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "close_transport"); + connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "close_transport"); if (t->ping_state.is_delayed_ping_timer_set) { - grpc_timer_cancel(&t->ping_state.delayed_ping_timer); + grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } if (t->have_next_bdp_ping_timer) { - grpc_timer_cancel(&t->next_bdp_ping_timer); + grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: - grpc_timer_cancel(&t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); break; case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: - grpc_timer_cancel(&t->keepalive_ping_timer); - grpc_timer_cancel(&t->keepalive_watchdog_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); break; case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED: @@ -610,13 +647,14 @@ static void close_transport_locked(grpc_chttp2_transport* t, /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { - GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close"); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); - grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); } if (t->notify_on_receive_settings != nullptr) { - GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings, + GRPC_ERROR_CANCELLED); t->notify_on_receive_settings = nullptr; } GRPC_ERROR_UNREF(error); @@ -626,21 +664,22 @@ static void close_transport_locked(grpc_chttp2_transport* t, void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) { - grpc_stream_unref(s->refcount, reason); +void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s, + const char* reason) { + grpc_stream_unref(exec_ctx, s->refcount, reason); } #else void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) { grpc_stream_ref(s->refcount); } -void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { - grpc_stream_unref(s->refcount); +void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s) { + grpc_stream_unref(exec_ctx, s->refcount); } #endif -static int init_stream(grpc_transport* gt, grpc_stream* gs, - grpc_stream_refcount* refcount, const void* server_data, - gpr_arena* arena) { +static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; @@ -674,7 +713,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, s->id = (uint32_t)(uintptr_t)server_data; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - post_destructive_reclaimer(t); + post_destructive_reclaimer(exec_ctx, t); } s->flow_control.Init(t->flow_control.get(), s); @@ -683,7 +722,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, return 0; } -static void destroy_stream_locked(void* sp, grpc_error* error) { +static void destroy_stream_locked(grpc_exec_ctx* exec_ctx, void* sp, + grpc_error* error) { grpc_chttp2_stream* s = (grpc_chttp2_stream*)sp; grpc_chttp2_transport* t = s->t; @@ -694,10 +734,11 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); } - grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&s->frame_storage); - grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer); - grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, + &s->unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -716,24 +757,27 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { GPR_ASSERT(s->recv_initial_metadata_ready == nullptr); GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); - grpc_chttp2_data_parser_destroy(&s->data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); - grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer); + grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); + grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, + &s->metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, + &s->metadata_buffer[1]); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); s->flow_control.Destroy(); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); - GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } -static void destroy_stream(grpc_transport* gt, grpc_stream* gs, +static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; @@ -750,6 +794,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); @@ -761,7 +806,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t, return (grpc_chttp2_stream*)grpc_chttp2_stream_map_find(&t->stream_map, id); } -grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, +grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, uint32_t id) { if (t->channel_callback.accept_stream == nullptr) { return nullptr; @@ -769,7 +815,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* accepting; GPR_ASSERT(t->accepting_stream == nullptr); t->accepting_stream = &accepting; - t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + t->channel_callback.accept_stream(exec_ctx, + t->channel_callback.accept_stream_user_data, &t->base, (void*)(uintptr_t)id); t->accepting_stream = nullptr; return accepting; @@ -791,7 +838,7 @@ static const char* write_state_name(grpc_chttp2_write_state st) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_write_state(grpc_chttp2_transport* t, +static void set_write_state(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_chttp2_write_state st, const char* reason) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t, t->is_client ? "CLIENT" : "SERVER", @@ -799,100 +846,108 @@ static void set_write_state(grpc_chttp2_transport* t, write_state_name(st), reason)); t->write_state = st; if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &t->run_after_write); if (t->close_transport_on_writes_finished != nullptr) { grpc_error* err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = nullptr; - close_transport_locked(t, err); + close_transport_locked(exec_ctx, t, err); } } } static void inc_initiate_write_reason( - grpc_chttp2_initiate_write_reason reason) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_initiate_write_reason reason) { switch (reason) { case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( + exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx); break; case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx); break; } } -void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, +void grpc_chttp2_initiate_write(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - inc_initiate_write_reason(reason); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, + inc_initiate_write_reason(exec_ctx, reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, grpc_chttp2_initiate_write_reason_string(reason)); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: @@ -901,7 +956,8 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t, +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { if (t->closed_with_error == GRPC_ERROR_NONE && grpc_chttp2_list_add_writable_stream(t, s)) { @@ -951,7 +1007,8 @@ static const char* begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { +static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* gt, + grpc_error* error_ignored) { GPR_TIMER_BEGIN("write_action_begin_locked", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); @@ -959,59 +1016,62 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { - r = grpc_chttp2_begin_write(t); + r = grpc_chttp2_begin_write(exec_ctx, t); } if (r.writing) { if (r.partial) { - GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(); + GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx); } if (!t->is_first_write_in_batch) { - GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(); + GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx); } grpc_closure_scheduler* scheduler = write_scheduler(t, r.early_results_scheduled, r.partial); if (scheduler != grpc_schedule_on_exec_ctx) { - GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(); + GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx); } set_write_state( - t, + exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE : GRPC_CHTTP2_WRITE_STATE_WRITING, begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler), GRPC_ERROR_NONE); } else { - GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); + GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); } GPR_TIMER_END("write_action_begin_locked", 0); } -static void write_action(void* gt, grpc_error* error) { +static void write_action(grpc_exec_ctx* exec_ctx, void* gt, grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( - t->ep, &t->outbuf, + exec_ctx, t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, grpc_combiner_scheduler(t->combiner))); GPR_TIMER_END("write_action", 0); } -static void write_action_end_locked(void* tp, grpc_error* error) { +static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (error != GRPC_ERROR_NONE) { - close_transport_locked(t, GRPC_ERROR_REF(error)); + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); } if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { close_transport_locked( - t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); } } @@ -1020,14 +1080,17 @@ static void write_action_end_locked(void* tp, grpc_error* error) { GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITE_STATE_WRITING: GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "finish writing"); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + "continue writing"); t->is_first_write_in_batch = false; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_RUN( + exec_ctx, GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_combiner_finally_scheduler(t->combiner)), @@ -1035,15 +1098,16 @@ static void write_action_end_locked(void* tp, grpc_error* error) { break; } - grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } // Dirties an HTTP2 setting to be sent out next time a writing path occurs. // If the change needs to occur immediately, manually initiate a write. -static void queue_setting_update(grpc_chttp2_transport* t, +static void queue_setting_update(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters* sp = &grpc_chttp2_settings_parameters[id]; @@ -1058,7 +1122,8 @@ static void queue_setting_update(grpc_chttp2_transport* t, } } -void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, +void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, uint32_t goaway_error, grpc_slice goaway_text) { // GRPC_CHTTP2_IF_TRACING( @@ -1093,11 +1158,12 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, /* lie: use transient failure from the transport to indicate goaway has been * received */ - connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, + connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(t->goaway_error), "got_goaway"); } -static void maybe_start_some_streams(grpc_chttp2_transport* t) { +static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_chttp2_stream* s; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ @@ -1117,21 +1183,22 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) { if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { connectivity_state_set( - t, GRPC_CHANNEL_TRANSIENT_FAILURE, + exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"), "no_more_stream_ids"); } grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - post_destructive_reclaimer(t); - grpc_chttp2_mark_stream_writable(t, s); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); + post_destructive_reclaimer(exec_ctx, t); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); @@ -1153,13 +1220,15 @@ static grpc_closure* add_closure_barrier(grpc_closure* closure) { return closure; } -static void null_then_run_closure(grpc_closure** closure, grpc_error* error) { +static void null_then_run_closure(grpc_exec_ctx* exec_ctx, + grpc_closure** closure, grpc_error* error) { grpc_closure* c = *closure; *closure = nullptr; - GRPC_CLOSURE_RUN(c, error); + GRPC_CLOSURE_RUN(exec_ctx, c, error); } -void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, +void grpc_chttp2_complete_closure_step(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_closure** pclosure, grpc_error* error, const char* desc) { @@ -1199,7 +1268,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, } if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { - GRPC_CLOSURE_RUN(closure, closure->error_data.error); + GRPC_CLOSURE_RUN(exec_ctx, closure, closure->error_data.error); } else { grpc_closure_list_append(&t->run_after_write, closure, closure->error_data.error); @@ -1215,24 +1284,28 @@ static bool contains_non_ok_status(grpc_metadata_batch* batch) { return false; } -static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t, +static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_mark_stream_writable(t, s); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); } } -static void add_fetched_slice_locked(grpc_chttp2_transport* t, +static void add_fetched_slice_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { s->fetched_send_message_length += (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - maybe_become_writable_due_to_send_msg(t, s); + maybe_become_writable_due_to_send_msg(exec_ctx, t, s); } -static void continue_fetching_send_locked(grpc_chttp2_transport* t, +static void continue_fetching_send_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { for (;;) { if (s->fetching_send_message == nullptr) { @@ -1241,11 +1314,11 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { - grpc_byte_stream_destroy(s->fetching_send_message); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( - t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished"); } else { grpc_chttp2_write_cb* cb = t->write_cb_pool; @@ -1266,37 +1339,39 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } s->fetching_send_message = nullptr; return; /* early out */ - } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX, - &s->complete_fetch_locked)) { - grpc_error* error = - grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, + UINT32_MAX, &s->complete_fetch_locked)) { + grpc_error* error = grpc_byte_stream_pull( + exec_ctx, s->fetching_send_message, &s->fetching_slice); if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); - grpc_chttp2_cancel_stream(t, s, error); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } else { - add_fetched_slice_locked(t, s); + add_fetched_slice_locked(exec_ctx, t, s); } } } } -static void complete_fetch_locked(void* gs, grpc_error* error) { +static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs, + grpc_error* error) { grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs; grpc_chttp2_transport* t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, + &s->fetching_slice); if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(t, s); - continue_fetching_send_locked(t, s); + add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); } } if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); - grpc_chttp2_cancel_stream(t, s, error); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } } -static void do_nothing(void* arg, grpc_error* error) {} +static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, bool is_client, bool is_initial) { @@ -1311,7 +1386,7 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, } } -static void perform_stream_op_locked(void* stream_op, +static void perform_stream_op_locked(grpc_exec_ctx* exec_ctx, void* stream_op, grpc_error* error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); @@ -1321,7 +1396,7 @@ static void perform_stream_op_locked(void* stream_op, grpc_transport_stream_op_batch_payload* op_payload = op->payload; grpc_chttp2_transport* t = s->t; - GRPC_STATS_INC_HTTP2_OP_BATCHES(); + GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx); if (grpc_http_trace.enabled()) { char* str = grpc_transport_stream_op_batch_string(op); @@ -1356,12 +1431,13 @@ static void perform_stream_op_locked(void* stream_op, } if (op->cancel_stream) { - GRPC_STATS_INC_HTTP2_OP_CANCEL(); - grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); + GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx); + grpc_chttp2_cancel_stream(exec_ctx, t, s, + op_payload->cancel_stream.cancel_error); } if (op->send_initial_metadata) { - GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); + GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(exec_ctx); GPR_ASSERT(s->send_initial_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1389,7 +1465,7 @@ static void perform_stream_op_locked(void* stream_op, } if (metadata_size > metadata_peer_limit) { grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int( grpc_error_set_int( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -1408,10 +1484,10 @@ static void perform_stream_op_locked(void* stream_op, if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); - maybe_start_some_streams(t); + maybe_start_some_streams(exec_ctx, t); } else { grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Transport closed", &t->closed_with_error, 1), @@ -1419,18 +1495,18 @@ static void perform_stream_op_locked(void* stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_mark_stream_writable(t, s); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); if (!(op->send_message && (op->payload->send_message.send_message->flags & GRPC_WRITE_BUFFER_HINT))) { grpc_chttp2_initiate_write( - t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); } } } else { s->send_initial_metadata = nullptr; grpc_chttp2_complete_closure_step( - t, s, &s->send_initial_metadata_finished, + exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Attempt to send initial metadata after stream was closed", &s->write_closed_error, 1), @@ -1444,9 +1520,9 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_message) { - GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); + GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(exec_ctx); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( - op->payload->send_message.send_message->length); + exec_ctx, op->payload->send_message.send_message->length); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1456,7 +1532,7 @@ static void perform_stream_op_locked(void* stream_op, // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. grpc_chttp2_complete_closure_step( - t, s, &s->fetching_send_message_finished, + exec_ctx, t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1485,13 +1561,13 @@ static void perform_stream_op_locked(void* stream_op, } else { s->write_buffering = false; } - continue_fetching_send_locked(t, s); - maybe_become_writable_due_to_send_msg(t, s); + continue_fetching_send_locked(exec_ctx, t, s); + maybe_become_writable_due_to_send_msg(exec_ctx, t, s); } } if (op->send_trailing_metadata) { - GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(); + GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(exec_ctx); GPR_ASSERT(s->send_trailing_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); @@ -1505,7 +1581,7 @@ static void perform_stream_op_locked(void* stream_op, [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (metadata_size > metadata_peer_limit) { grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int( grpc_error_set_int( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -1522,7 +1598,7 @@ static void perform_stream_op_locked(void* stream_op, if (s->write_closed) { s->send_trailing_metadata = nullptr; grpc_chttp2_complete_closure_step( - t, s, &s->send_trailing_metadata_finished, + exec_ctx, t, s, &s->send_trailing_metadata_finished, grpc_metadata_batch_is_empty( op->payload->send_trailing_metadata.send_trailing_metadata) ? GRPC_ERROR_NONE @@ -1533,15 +1609,15 @@ static void perform_stream_op_locked(void* stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_mark_stream_writable(t, s); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write( - t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); } } } if (op->recv_initial_metadata) { - GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(); + GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(exec_ctx); GPR_ASSERT(s->recv_initial_metadata_ready == nullptr); s->recv_initial_metadata_ready = op_payload->recv_initial_metadata.recv_initial_metadata_ready; @@ -1553,11 +1629,11 @@ static void perform_stream_op_locked(void* stream_op, gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, (gpr_atm)gpr_strdup(t->peer_string)); } - grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); } if (op->recv_message) { - GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); + GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(exec_ctx); size_t already_received; GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(!s->pending_byte_stream); @@ -1568,30 +1644,32 @@ static void perform_stream_op_locked(void* stream_op, already_received = s->frame_storage.length; s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, already_received); - grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); + grpc_chttp2_act_on_flowctl_action(exec_ctx, + s->flow_control->MakeAction(), t, s); } } - grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } if (op->recv_trailing_metadata) { - GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(); + GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx); GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata = op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } - grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, - "op->on_complete"); + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &on_complete, + GRPC_ERROR_NONE, "op->on_complete"); GPR_TIMER_END("perform_stream_op_locked", 0); - GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "perform_stream_op"); } -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, +static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_transport_stream_op_batch* op) { GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; @@ -1619,29 +1697,32 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } -static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { +static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ grpc_chttp2_ping_queue* pq = &t->ping_queue; GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); } GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_chttp2_transport* t, +static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1651,15 +1732,18 @@ static void send_ping_locked(grpc_chttp2_transport* t, GRPC_ERROR_NONE); } -static void retry_initiate_ping_locked(void* tp, grpc_error* error) { +static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; t->ping_state.is_delayed_ping_timer_set = false; if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } } -void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { +void grpc_chttp2_ack_ping(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + uint64_t id) { grpc_chttp2_ping_queue* pq = &t->ping_queue; if (pq->inflight_id != id) { char* from = grpc_endpoint_get_peer(t->ep); @@ -1667,48 +1751,54 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } } -static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) { +static void send_goaway(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_http2_error_code http_error; grpc_slice slice; - grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice, - &http_error, nullptr); + grpc_error_get_status(exec_ctx, error, GRPC_MILLIS_INF_FUTURE, nullptr, + &slice, &http_error, nullptr); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } -void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { +void grpc_chttp2_add_ping_strike(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { t->ping_recv_state.ping_strikes++; if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && t->ping_policy.max_ping_strikes != 0) { - send_goaway(t, + send_goaway(exec_ctx, t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + exec_ctx, t, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } -static void perform_transport_op_locked(void* stream_op, +static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, + void* stream_op, grpc_error* error_ignored) { grpc_transport_op* op = (grpc_transport_op*)stream_op; grpc_chttp2_transport* t = (grpc_chttp2_transport*)op->handler_private.extra_arg; if (op->goaway_error) { - send_goaway(t, op->goaway_error); + send_goaway(exec_ctx, t, op->goaway_error); } if (op->set_accept_stream) { @@ -1718,40 +1808,43 @@ static void perform_transport_op_locked(void* stream_op, } if (op->bind_pollset) { - grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset); + grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset); } if (op->bind_pollset_set) { - grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set); + grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); } if (op->send_ping) { - send_ping_locked(t, nullptr, op->send_ping); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); + send_ping_locked(exec_ctx, t, nullptr, op->send_ping); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != nullptr) { grpc_connectivity_state_notify_on_state_change( - &t->channel_callback.state_tracker, op->connectivity_state, + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change); } if (op->disconnect_with_error != GRPC_ERROR_NONE) { - close_transport_locked(t, op->disconnect_with_error); + close_transport_locked(exec_ctx, t, op->disconnect_with_error); } - GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op"); } -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { +static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_transport_op* op) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; char* msg = grpc_transport_op_string(op); gpr_free(msg); op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_transport_op_locked, op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); @@ -1761,33 +1854,36 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { * INPUT PROCESSING - GENERAL */ -void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t, +void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { if (s->recv_initial_metadata_ready != nullptr && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); } } - grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0], - s->recv_initial_metadata); - null_then_run_closure(&s->recv_initial_metadata_ready, GRPC_ERROR_NONE); + grpc_chttp2_incoming_metadata_buffer_publish( + exec_ctx, &s->metadata_buffer[0], s->recv_initial_metadata); + null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } } -void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, +void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { grpc_error* error = GRPC_ERROR_NONE; if (s->recv_message_ready != nullptr) { *s->recv_message = nullptr; if (s->final_metadata_requested && s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); } } if (!s->pending_byte_stream) { @@ -1814,9 +1910,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, &s->decompressed_data_buffer, nullptr, GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Stream decompression error."); } else { @@ -1825,8 +1922,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, s->decompressed_header_bytes = 0; } error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->decompressed_data_buffer, nullptr, - s->recv_message); + exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, + nullptr, s->recv_message); if (end_of_context) { grpc_stream_compression_context_destroy( s->stream_decompression_ctx); @@ -1835,14 +1932,15 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, } } else { error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, - nullptr, s->recv_message); + exec_ctx, &s->data_parser, s, + &s->unprocessed_incoming_frames_buffer, nullptr, s->recv_message); } if (error != GRPC_ERROR_NONE) { s->seen_error = true; - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + &s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); break; } else if (*s->recv_message != nullptr) { break; @@ -1850,25 +1948,26 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, } } if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) { - null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { *s->recv_message = nullptr; - null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } GRPC_ERROR_UNREF(error); } } -void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, +void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s) { - grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); if (s->recv_trailing_metadata_finished != nullptr && s->read_closed && s->write_closed) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); } } bool pending_data = s->pending_byte_stream || @@ -1886,9 +1985,9 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, s->stream_decompression_ctx, &s->frame_storage, &s->unprocessed_incoming_frames_buffer, nullptr, GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); + exec_ctx, &s->unprocessed_incoming_frames_buffer); s->seen_error = true; } else { if (s->unprocessed_incoming_frames_buffer.length > 0) { @@ -1903,23 +2002,23 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, } if (s->read_closed && s->frame_storage.length == 0 && !pending_data && s->recv_trailing_metadata_finished != nullptr) { - grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], - s->recv_trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_publish( + exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, + exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, "recv_trailing_metadata_finished"); } } } -static void remove_stream(grpc_chttp2_transport* t, uint32_t id, - grpc_error* error) { +static void remove_stream(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + uint32_t id, grpc_error* error) { grpc_chttp2_stream* s = (grpc_chttp2_stream*)grpc_chttp2_stream_map_delete(&t->stream_map, id); GPR_ASSERT(s); if (t->incoming_stream == s) { t->incoming_stream = nullptr; - grpc_chttp2_parsing_become_skip_parser(t); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); } if (s->pending_byte_stream) { if (s->on_next != nullptr) { @@ -1927,8 +2026,8 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - incoming_byte_stream_publish_error(bs, error); - incoming_byte_stream_unref(bs); + incoming_byte_stream_publish_error(exec_ctx, bs, error); + incoming_byte_stream_unref(exec_ctx, bs); s->data_parser.parsing_frame = nullptr; } else { GRPC_ERROR_UNREF(s->byte_stream_error); @@ -1937,52 +2036,56 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, } if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - post_benign_reclaimer(t); + post_benign_reclaimer(exec_ctx, t); if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { close_transport_locked( - t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Last stream closed after sending GOAWAY", &error, 1)); + exec_ctx, t, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Last stream closed after sending GOAWAY", &error, 1)); } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { - GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream"); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); } GRPC_ERROR_UNREF(error); - maybe_start_some_streams(t); + maybe_start_some_streams(exec_ctx, t); } -void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s, +void grpc_chttp2_cancel_stream(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_error* due_to_error) { if (!t->is_client && !s->sent_trailing_metadata && grpc_error_has_clear_grpc_status(due_to_error)) { - close_from_api(t, s, due_to_error); + close_from_api(exec_ctx, t, s, due_to_error); return; } if (!s->read_closed || !s->write_closed) { if (s->id != 0) { grpc_http2_error_code http_error; - grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr, - &http_error, nullptr); + grpc_error_get_status(exec_ctx, due_to_error, s->deadline, nullptr, + nullptr, &http_error, nullptr); grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { s->seen_error = true; } - grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error); + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error); } -void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s, - grpc_error* error) { +void grpc_chttp2_fake_status(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error) { grpc_status_code status; grpc_slice slice; - grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr); + grpc_error_get_status(exec_ctx, error, s->deadline, &status, &slice, nullptr, + nullptr); if (status != GRPC_STATUS_OK) { s->seen_error = true; } @@ -1998,20 +2101,20 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s, gpr_ltoa(status, status_string); GRPC_LOG_IF_ERROR("add_status", grpc_chttp2_incoming_metadata_buffer_replace_or_add( - &s->metadata_buffer[1], + exec_ctx, &s->metadata_buffer[1], grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_STATUS, + exec_ctx, GRPC_MDSTR_GRPC_STATUS, grpc_slice_from_copied_string(status_string)))); if (!GRPC_SLICE_IS_EMPTY(slice)) { GRPC_LOG_IF_ERROR( "add_status_message", grpc_chttp2_incoming_metadata_buffer_replace_or_add( - &s->metadata_buffer[1], - grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE, + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, grpc_slice_ref_internal(slice)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } GRPC_ERROR_UNREF(error); @@ -2044,12 +2147,14 @@ static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s, return error; } -static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, - grpc_chttp2_write_cb** list, grpc_error* error) { +static void flush_write_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_chttp2_write_cb** list, + grpc_error* error) { while (*list) { grpc_chttp2_write_cb* cb = *list; *list = cb->next; - grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error), + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, + GRPC_ERROR_REF(error), "on_write_finished_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; @@ -2057,34 +2162,37 @@ static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, GRPC_ERROR_UNREF(error); } -void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_error* error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = nullptr; - grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished, - GRPC_ERROR_REF(error), - "send_initial_metadata_finished"); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), + "send_initial_metadata_finished"); s->send_trailing_metadata = nullptr; - grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished, - GRPC_ERROR_REF(error), - "send_trailing_metadata_finished"); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->send_trailing_metadata_finished, + GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); s->fetching_send_message = nullptr; - grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, - GRPC_ERROR_REF(error), - "fetching_send_message_finished"); - flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); - flush_write_list(t, s, &s->on_flow_controlled_cbs, error); + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), + "fetching_send_message_finished"); + flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, + GRPC_ERROR_REF(error)); + flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); } -void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t, +void grpc_chttp2_mark_stream_closed(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads, int close_writes, grpc_error* error) { if (s->read_closed && s->write_closed) { /* already closed */ - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); GRPC_ERROR_UNREF(error); return; } @@ -2098,20 +2206,20 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t, if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; - grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); } if (s->read_closed && s->write_closed) { became_closed = true; grpc_error* overall_error = removal_error(GRPC_ERROR_REF(error), s, "Stream removed"); if (s->id != 0) { - remove_stream(t, s->id, GRPC_ERROR_REF(overall_error)); + remove_stream(exec_ctx, t, s->id, GRPC_ERROR_REF(overall_error)); } else { /* Purge streams waiting on concurrency still waiting for id assignment */ grpc_chttp2_list_remove_waiting_for_concurrency(t, s); } if (overall_error != GRPC_ERROR_NONE) { - grpc_chttp2_fake_status(t, s, overall_error); + grpc_chttp2_fake_status(exec_ctx, t, s, overall_error); } } if (closed_read) { @@ -2120,18 +2228,18 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t, s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; } } - grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); - grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } if (became_closed) { - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); - GRPC_CHTTP2_STREAM_UNREF(s, "chttp2"); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); } GRPC_ERROR_UNREF(error); } -static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, - grpc_error* error) { +static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_chttp2_stream* s, grpc_error* error) { grpc_slice hdr; grpc_slice status_hdr; grpc_slice http_status_hdr; @@ -2141,8 +2249,8 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t len = 0; grpc_status_code grpc_status; grpc_slice slice; - grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr, - nullptr); + grpc_error_get_status(exec_ctx, error, s->deadline, &grpc_status, &slice, + nullptr, nullptr); GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); @@ -2284,11 +2392,13 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_mark_stream_closed(t, s, 1, 1, error); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); } typedef struct { + grpc_exec_ctx* exec_ctx; grpc_error* error; grpc_chttp2_transport* t; } cancel_stream_cb_args; @@ -2296,11 +2406,13 @@ typedef struct { static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) { cancel_stream_cb_args* args = (cancel_stream_cb_args*)user_data; grpc_chttp2_stream* s = (grpc_chttp2_stream*)stream; - grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error)); + grpc_chttp2_cancel_stream(args->exec_ctx, args->t, s, + GRPC_ERROR_REF(args->error)); } -static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) { - cancel_stream_cb_args args = {error, t}; +static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, + grpc_error* error) { + cancel_stream_cb_args args = {exec_ctx, error, t}; grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args); GRPC_ERROR_UNREF(error); } @@ -2310,14 +2422,14 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) { */ template <class F> -static void WithUrgency(grpc_chttp2_transport* t, +static void WithUrgency(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_core::chttp2::FlowControlAction::Urgency urgency, grpc_chttp2_initiate_write_reason reason, F action) { switch (urgency) { case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED: break; case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write(t, reason); + grpc_chttp2_initiate_write(exec_ctx, t, reason); // fallthrough case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE: action(); @@ -2326,27 +2438,31 @@ static void WithUrgency(grpc_chttp2_transport* t, } void grpc_chttp2_act_on_flowctl_action( - const grpc_core::chttp2::FlowControlAction& action, + grpc_exec_ctx* exec_ctx, const grpc_core::chttp2::FlowControlAction& action, grpc_chttp2_transport* t, grpc_chttp2_stream* s) { - WithUrgency(t, action.send_stream_update(), - GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, - [t, s]() { grpc_chttp2_mark_stream_writable(t, s); }); - WithUrgency(t, action.send_transport_update(), + WithUrgency( + exec_ctx, t, action.send_stream_update(), + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); }); + WithUrgency(exec_ctx, t, action.send_transport_update(), GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {}); - WithUrgency(t, action.send_initial_window_update(), - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() { - queue_setting_update(t, + WithUrgency(exec_ctx, t, action.send_initial_window_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, action.initial_window_size()); }); - WithUrgency(t, action.send_max_frame_size_update(), - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() { - queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - action.max_frame_size()); - }); + WithUrgency( + exec_ctx, t, action.send_max_frame_size_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + action.max_frame_size()); + }); } -static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { +static grpc_error* try_http_parsing(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { grpc_http_parser parser; size_t i = 0; grpc_error* error = GRPC_ERROR_NONE; @@ -2375,7 +2491,8 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { return error; } -static void read_action_locked(void* tp, grpc_error* error) { +static void read_action_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { GPR_TIMER_BEGIN("reading_action_locked", 0); grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; @@ -2399,10 +2516,11 @@ static void read_action_locked(void* tp, grpc_error* error) { for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { t->flow_control->bdp_estimator()->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); - errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]); + errors[1] = + grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); } if (errors[1] != GRPC_ERROR_NONE) { - errors[2] = try_http_parsing(t); + errors[2] = try_http_parsing(exec_ctx, t); GRPC_ERROR_UNREF(error); error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors)); @@ -2417,9 +2535,10 @@ static void read_action_locked(void* tp, grpc_error* error) { if (t->initial_window_update > 0) { grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_mark_stream_writable(t, s); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); grpc_chttp2_initiate_write( - t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->initial_window_update = 0; @@ -2440,21 +2559,22 @@ static void read_action_locked(void* tp, grpc_error* error) { error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error)); } - close_transport_locked(t, GRPC_ERROR_REF(error)); + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } - grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer); if (keep_reading) { - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked); - grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, - nullptr); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading"); + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, + &t->read_action_locked); + grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(), + t, nullptr); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } GPR_TIMER_END("post_reading_action_locked", 0); @@ -2466,12 +2586,15 @@ static void read_action_locked(void* tp, grpc_error* error) { // t is reffed prior to calling the first time, and once the callback chain // that kicks off finishes, it's unreffed -static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { +static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { t->flow_control->bdp_estimator()->SchedulePing(); - send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); } -static void start_bdp_ping_locked(void* tp, grpc_error* error) { +static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (grpc_http_trace.enabled()) { gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, @@ -2479,39 +2602,42 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { } /* Reset the keepalive ping timer */ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { - grpc_timer_cancel(&t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } t->flow_control->bdp_estimator()->StartPing(); } -static void finish_bdp_ping_locked(void* tp, grpc_error* error) { +static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; if (grpc_http_trace.enabled()) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); return; } - grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing(); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, - nullptr); + grpc_millis next_ping = + t->flow_control->bdp_estimator()->CompletePing(exec_ctx); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr); GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; - grpc_timer_init(&t->next_bdp_ping_timer, next_ping, + grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired_locked); } -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx, + void* tp, grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp; GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); return; } - schedule_bdp_ping_locked(t); + schedule_bdp_ping_locked(exec_ctx, t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, @@ -2572,7 +2698,8 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } -static void init_keepalive_ping_locked(void* arg, grpc_error* error) { +static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { @@ -2582,55 +2709,59 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - send_ping_locked(t, &t->start_keepalive_ping_locked, + send_ping_locked(exec_ctx, t, &t->start_keepalive_ping_locked, &t->finish_keepalive_ping_locked); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->init_keepalive_ping_locked); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); } -static void start_keepalive_ping_locked(void* arg, grpc_error* error) { +static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); - grpc_timer_init(&t->keepalive_watchdog_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->keepalive_watchdog_fired_locked); } -static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { +static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(&t->keepalive_watchdog_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, &t->init_keepalive_ping_locked); } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); } -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { +static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; close_transport_locked( - t, + exec_ctx, t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "keepalive watchdog timeout"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); @@ -2643,67 +2774,71 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); } /******************************************************************************* * CALLBACK LOOP */ -static void connectivity_state_set(grpc_chttp2_transport* t, +static void connectivity_state_set(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t, grpc_connectivity_state state, grpc_error* error, const char* reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error, - reason); + grpc_connectivity_state_set(exec_ctx, &t->channel_callback.state_tracker, + state, error, reason); } /******************************************************************************* * POLLSET STUFF */ -static void set_pollset(grpc_transport* gt, grpc_stream* gs, - grpc_pollset* pollset) { +static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset* pollset) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; - grpc_endpoint_add_to_pollset(t->ep, pollset); + grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); } -static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, - grpc_pollset_set* pollset_set) { +static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset_set* pollset_set) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt; - grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); + grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); } /******************************************************************************* * BYTE STREAM */ -static void reset_byte_stream(void* arg, grpc_error* error) { +static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_stream* s = (grpc_chttp2_stream*)arg; s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { - grpc_chttp2_maybe_complete_recv_message(s->t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s); } else { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); s->on_next = nullptr; GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_NONE; - grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) { +static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx, + grpc_chttp2_incoming_byte_stream* bs) { if (gpr_unref(&bs->refs)) { gpr_free(bs); } } -static void incoming_byte_stream_next_locked(void* argp, +static void incoming_byte_stream_next_locked(grpc_exec_ctx* exec_ctx, + void* argp, grpc_error* error_ignored) { grpc_chttp2_incoming_byte_stream* bs = (grpc_chttp2_incoming_byte_stream*)argp; @@ -2714,29 +2849,30 @@ static void incoming_byte_stream_next_locked(void* argp, if (!s->read_closed) { s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, cur_length); - grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); + grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(), + t, s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); s->data_parser.parsing_frame = nullptr; } } else if (s->read_closed) { if (bs->remaining_bytes != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); s->data_parser.parsing_frame = nullptr; } } else { @@ -2746,10 +2882,11 @@ static void incoming_byte_stream_next_locked(void* argp, } else { s->on_next = bs->next_action.on_complete; } - incoming_byte_stream_unref(bs); + incoming_byte_stream_unref(exec_ctx, bs); } -static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, +static bool incoming_byte_stream_next(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, size_t max_size_hint, grpc_closure* on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); @@ -2764,6 +2901,7 @@ static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, bs->next_action.max_size_hint = max_size_hint; bs->next_action.on_complete = on_complete; GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&bs->next_action.closure, incoming_byte_stream_next_locked, bs, grpc_combiner_scheduler(bs->transport->combiner)), @@ -2773,7 +2911,8 @@ static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, } } -static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream, +static grpc_error* incoming_byte_stream_pull(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, grpc_slice* slice) { GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); grpc_chttp2_incoming_byte_stream* bs = @@ -2809,28 +2948,31 @@ static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream, } } error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, - nullptr); + exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, + slice, nullptr); if (error != GRPC_ERROR_NONE) { return error; } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } GPR_TIMER_END("incoming_byte_stream_pull", 0); return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(void* byte_stream, +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, grpc_error* error_ignored); -static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) { +static void incoming_byte_stream_destroy(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream) { GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream* bs = (grpc_chttp2_incoming_byte_stream*)byte_stream; GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&bs->destroy_action, incoming_byte_stream_destroy_locked, bs, grpc_combiner_scheduler(bs->transport->combiner)), @@ -2839,28 +2981,30 @@ static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) { } static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error) { grpc_chttp2_stream* s = bs->stream; GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); s->on_next = nullptr; GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_REF(error); - grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error)); + grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, + GRPC_ERROR_REF(error)); } grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_slice slice, grpc_slice* slice_out) { grpc_chttp2_stream* s = bs->stream; if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); - grpc_slice_unref_internal(slice); + GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + grpc_slice_unref_internal(exec_ctx, slice); return error; } else { bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); @@ -2872,8 +3016,8 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push( } grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs, + grpc_error* error, bool reset_on_error) { grpc_chttp2_stream* s = bs->stream; if (error == GRPC_ERROR_NONE) { @@ -2882,25 +3026,27 @@ grpc_error* grpc_chttp2_incoming_byte_stream_finished( } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); } - incoming_byte_stream_unref(bs); + incoming_byte_stream_unref(exec_ctx, bs); return error; } -static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream, +static void incoming_byte_stream_shutdown(grpc_exec_ctx* exec_ctx, + grpc_byte_stream* byte_stream, grpc_error* error) { grpc_chttp2_incoming_byte_stream* bs = (grpc_chttp2_incoming_byte_stream*)byte_stream; GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - bs, error, true /* reset_on_error */)); + exec_ctx, bs, error, true /* reset_on_error */)); } static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { incoming_byte_stream_next, incoming_byte_stream_pull, incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; -static void incoming_byte_stream_destroy_locked(void* byte_stream, +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx, + void* byte_stream, grpc_error* error_ignored) { grpc_chttp2_incoming_byte_stream* bs = (grpc_chttp2_incoming_byte_stream*)byte_stream; @@ -2908,15 +3054,15 @@ static void incoming_byte_stream_destroy_locked(void* byte_stream, grpc_chttp2_transport* t = s->t; GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); - incoming_byte_stream_unref(bs); + incoming_byte_stream_unref(exec_ctx, bs); s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_chttp2_stream* s, + uint32_t frame_size, uint32_t flags) { grpc_chttp2_incoming_byte_stream* incoming_byte_stream = (grpc_chttp2_incoming_byte_stream*)gpr_malloc( sizeof(*incoming_byte_stream)); @@ -2936,25 +3082,30 @@ grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( * RESOURCE QUOTAS */ -static void post_benign_reclaimer(grpc_chttp2_transport* t) { +static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); - grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), false, &t->benign_reclaimer_locked); } } -static void post_destructive_reclaimer(grpc_chttp2_transport* t) { +static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport* t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); - grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), true, &t->destructive_reclaimer_locked); } } -static void benign_reclaimer_locked(void* arg, grpc_error* error) { +static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -2964,7 +3115,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", t->peer_string); } - send_goaway(t, + send_goaway(exec_ctx, t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); @@ -2977,12 +3128,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { t->benign_reclaimer_registered = false; if (error != GRPC_ERROR_CANCELLED) { grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(void* arg, grpc_error* error) { +static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg; size_t n = grpc_chttp2_stream_map_size(&t->stream_map); t->destructive_reclaimer_registered = false; @@ -2994,7 +3146,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) { s->id); } grpc_chttp2_cancel_stream( - t, s, + exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); @@ -3003,14 +3155,14 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) { there are more streams left, we can immediately post a new reclaimer in case the resource quota needs to free more memory */ - post_destructive_reclaimer(t); + post_destructive_reclaimer(exec_ctx, t); } } if (error != GRPC_ERROR_CANCELLED) { grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); } /******************************************************************************* @@ -3064,7 +3216,8 @@ const char* grpc_chttp2_initiate_write_reason_string( GPR_UNREACHABLE_CODE(return "unknown"); } -static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) { +static grpc_endpoint* chttp2_get_endpoint(grpc_exec_ctx* exec_ctx, + grpc_transport* t) { return ((grpc_chttp2_transport*)t)->ep; } @@ -3082,16 +3235,17 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), static const grpc_transport_vtable* get_vtable(void) { return &vtable; } grpc_transport* grpc_create_chttp2_transport( - const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { + grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, is_client); + init_transport(exec_ctx, t, channel_args, ep, is_client); return &t->base; } void grpc_chttp2_transport_start_reading( - grpc_transport* transport, grpc_slice_buffer* read_buffer, - grpc_closure* notify_on_receive_settings) { + grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ @@ -3100,5 +3254,5 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; - GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); } |