aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc1088
1 files changed, 621 insertions, 467 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index f537fb09c2..63ac65ac78 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -95,77 +95,105 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount");
/* forward declarations of various callbacks that we'll build closures around */
-static void write_action_begin_locked(void* t, grpc_error* error);
-static void write_action(void* t, grpc_error* error);
-static void write_action_end_locked(void* t, grpc_error* error);
+static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* t,
+ grpc_error* error);
+static void write_action(grpc_exec_ctx* exec_ctx, void* t, grpc_error* error);
+static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* t,
+ grpc_error* error);
-static void read_action_locked(void* t, grpc_error* error);
+static void read_action_locked(grpc_exec_ctx* exec_ctx, void* t,
+ grpc_error* error);
-static void complete_fetch_locked(void* gs, grpc_error* error);
+static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs,
+ grpc_error* error);
/** Set a transport level setting, and push it to our peer */
-static void queue_setting_update(grpc_chttp2_transport* t,
+static void queue_setting_update(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value);
-static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
- grpc_error* error);
+static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, grpc_error* error);
/** Start new streams that have been created if we can */
-static void maybe_start_some_streams(grpc_chttp2_transport* t);
+static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t);
-static void connectivity_state_set(grpc_chttp2_transport* t,
+static void connectivity_state_set(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason);
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx,
+ void* byte_stream,
grpc_error* error_ignored);
static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs);
-
-static void benign_reclaimer_locked(void* t, grpc_error* error);
-static void destructive_reclaimer_locked(void* t, grpc_error* error);
-
-static void post_benign_reclaimer(grpc_chttp2_transport* t);
-static void post_destructive_reclaimer(grpc_chttp2_transport* t);
-
-static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
-static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
-
-static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
-static void start_bdp_ping_locked(void* tp, grpc_error* error);
-static void finish_bdp_ping_locked(void* tp, grpc_error* error);
-static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
-
-static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
-static void send_ping_locked(grpc_chttp2_transport* t,
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs,
+ grpc_error* error);
+static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_incoming_byte_stream* bs);
+
+static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t,
+ grpc_error* error);
+static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* t,
+ grpc_error* error);
+
+static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t);
+static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t);
+
+static void close_transport_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t, grpc_error* error);
+static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_error* error);
+
+static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t);
+static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error);
+static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error);
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx,
+ void* tp, grpc_error* error);
+
+static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_error* error);
+static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
grpc_closure* on_initiate,
grpc_closure* on_complete);
-static void retry_initiate_ping_locked(void* tp, grpc_error* error);
+static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error);
/** keepalive-relevant functions */
-static void init_keepalive_ping_locked(void* arg, grpc_error* error);
-static void start_keepalive_ping_locked(void* arg, grpc_error* error);
-static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
-static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
-
-static void reset_byte_stream(void* arg, grpc_error* error);
+static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
+
+static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
-static void destruct_transport(grpc_chttp2_transport* t) {
+static void destruct_transport(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
size_t i;
- grpc_endpoint_destroy(t->ep);
+ grpc_endpoint_destroy(exec_ctx, t->ep);
- grpc_slice_buffer_destroy_internal(&t->qbuf);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf);
- grpc_slice_buffer_destroy_internal(&t->outbuf);
- grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf);
+ grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor);
- grpc_slice_buffer_destroy_internal(&t->read_buffer);
- grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer);
+ grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
@@ -178,11 +206,12 @@ static void destruct_transport(grpc_chttp2_transport* t) {
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
grpc_chttp2_stream_map_destroy(&t->stream_map);
- grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
+ grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport");
+ GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport");
- cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
+ cancel_pings(exec_ctx, t,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
while (t->write_cb_pool) {
grpc_chttp2_write_cb* next = t->write_cb_pool->next;
@@ -199,7 +228,8 @@ static void destruct_transport(grpc_chttp2_transport* t) {
}
#ifndef NDEBUG
-void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason,
+void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t, const char* reason,
const char* file, int line) {
if (grpc_trace_chttp2_refcount.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
@@ -207,7 +237,7 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason,
t, val, val - 1, reason, file, line);
}
if (!gpr_unref(&t->refs)) return;
- destruct_transport(t);
+ destruct_transport(exec_ctx, t);
}
void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason,
@@ -220,9 +250,10 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason,
gpr_ref(&t->refs);
}
#else
-void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
+void grpc_chttp2_unref_transport(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
if (!gpr_unref(&t->refs)) return;
- destruct_transport(t);
+ destruct_transport(exec_ctx, t);
}
void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
@@ -230,7 +261,7 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
static const grpc_transport_vtable* get_vtable(void);
-static void init_transport(grpc_chttp2_transport* t,
+static void init_transport(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
const grpc_channel_args* channel_args,
grpc_endpoint* ep, bool is_client) {
size_t i;
@@ -289,7 +320,7 @@ static void init_transport(grpc_chttp2_transport* t,
t->goaway_error = GRPC_ERROR_NONE;
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- grpc_chttp2_hpack_parser_init(&t->hpack_parser);
+ grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
grpc_slice_buffer_init(&t->read_buffer);
@@ -320,13 +351,14 @@ static void init_transport(grpc_chttp2_transport* t,
/* configure http2 the way we like it */
if (is_client) {
- queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
- queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
- 1);
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
t->ping_policy.min_sent_ping_interval_without_data =
@@ -501,7 +533,7 @@ static void init_transport(grpc_chttp2_transport* t,
int value = grpc_channel_arg_get_integer(
&channel_args->args[i], settings_map[j].integer_options);
if (value >= 0) {
- queue_setting_update(t, settings_map[j].setting_id,
+ queue_setting_update(exec_ctx, t, settings_map[j].setting_id,
(uint32_t)value);
}
}
@@ -512,7 +544,7 @@ static void init_transport(grpc_chttp2_transport* t,
}
}
- t->flow_control.Init(t, enable_bdp);
+ t->flow_control.Init(exec_ctx, t, enable_bdp);
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
@@ -526,8 +558,8 @@ static void init_transport(grpc_chttp2_transport* t,
if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
- grpc_timer_init(&t->keepalive_ping_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
+ grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
} else {
/* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
@@ -537,37 +569,42 @@ static void init_transport(grpc_chttp2_transport* t,
if (enable_bdp) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
- schedule_bdp_ping_locked(t);
+ schedule_bdp_ping_locked(exec_ctx, t);
- grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
- nullptr);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
}
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
- post_benign_reclaimer(t);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
+ post_benign_reclaimer(exec_ctx, t);
}
-static void destroy_transport_locked(void* tp, grpc_error* error) {
+static void destroy_transport_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
t->destroying = 1;
close_transport_locked(
- t, grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
- GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
+ exec_ctx, t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
-static void destroy_transport(grpc_transport* gt) {
+static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
-static void close_transport_locked(grpc_chttp2_transport* t,
+static void close_transport_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_error* error) {
- end_all_the_calls(t, GRPC_ERROR_REF(error));
- cancel_pings(t, GRPC_ERROR_REF(error));
+ end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
if (t->closed_with_error == GRPC_ERROR_NONE) {
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
@@ -585,21 +622,21 @@ static void close_transport_locked(grpc_chttp2_transport* t,
}
GPR_ASSERT(error != GRPC_ERROR_NONE);
t->closed_with_error = GRPC_ERROR_REF(error);
- connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "close_transport");
+ connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "close_transport");
if (t->ping_state.is_delayed_ping_timer_set) {
- grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
}
if (t->have_next_bdp_ping_timer) {
- grpc_timer_cancel(&t->next_bdp_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer);
}
switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
- grpc_timer_cancel(&t->keepalive_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
break;
case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
- grpc_timer_cancel(&t->keepalive_ping_timer);
- grpc_timer_cancel(&t->keepalive_watchdog_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
@@ -610,13 +647,14 @@ static void close_transport_locked(grpc_chttp2_transport* t,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream* s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
- GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
- grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
+ grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
if (t->notify_on_receive_settings != nullptr) {
- GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
+ GRPC_ERROR_CANCELLED);
t->notify_on_receive_settings = nullptr;
}
GRPC_ERROR_UNREF(error);
@@ -626,21 +664,22 @@ static void close_transport_locked(grpc_chttp2_transport* t,
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
grpc_stream_ref(s->refcount, reason);
}
-void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
- grpc_stream_unref(s->refcount, reason);
+void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s,
+ const char* reason) {
+ grpc_stream_unref(exec_ctx, s->refcount, reason);
}
#else
void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
grpc_stream_ref(s->refcount);
}
-void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
- grpc_stream_unref(s->refcount);
+void grpc_chttp2_stream_unref(grpc_exec_ctx* exec_ctx, grpc_chttp2_stream* s) {
+ grpc_stream_unref(exec_ctx, s->refcount);
}
#endif
-static int init_stream(grpc_transport* gt, grpc_stream* gs,
- grpc_stream_refcount* refcount, const void* server_data,
- gpr_arena* arena) {
+static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_stream_refcount* refcount,
+ const void* server_data, gpr_arena* arena) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs;
@@ -674,7 +713,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
s->id = (uint32_t)(uintptr_t)server_data;
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
- post_destructive_reclaimer(t);
+ post_destructive_reclaimer(exec_ctx, t);
}
s->flow_control.Init(t->flow_control.get(), s);
@@ -683,7 +722,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
return 0;
}
-static void destroy_stream_locked(void* sp, grpc_error* error) {
+static void destroy_stream_locked(grpc_exec_ctx* exec_ctx, void* sp,
+ grpc_error* error) {
grpc_chttp2_stream* s = (grpc_chttp2_stream*)sp;
grpc_chttp2_transport* t = s->t;
@@ -694,10 +734,11 @@ static void destroy_stream_locked(void* sp, grpc_error* error) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr);
}
- grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer);
- grpc_slice_buffer_destroy_internal(&s->frame_storage);
- grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer);
- grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx,
+ &s->unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer);
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -716,24 +757,27 @@ static void destroy_stream_locked(void* sp, grpc_error* error) {
GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
- grpc_chttp2_data_parser_destroy(&s->data_parser);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
- grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer);
+ grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[0]);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[1]);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
s->flow_control.Destroy();
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
GPR_TIMER_END("destroy_stream", 0);
- GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
}
-static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
+static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
grpc_closure* then_schedule_closure) {
GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
@@ -750,6 +794,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
s->destroy_stream_arg = then_schedule_closure;
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
@@ -761,7 +806,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t,
return (grpc_chttp2_stream*)grpc_chttp2_stream_map_find(&t->stream_map, id);
}
-grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
+grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
uint32_t id) {
if (t->channel_callback.accept_stream == nullptr) {
return nullptr;
@@ -769,7 +815,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* accepting;
GPR_ASSERT(t->accepting_stream == nullptr);
t->accepting_stream = &accepting;
- t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
+ t->channel_callback.accept_stream(exec_ctx,
+ t->channel_callback.accept_stream_user_data,
&t->base, (void*)(uintptr_t)id);
t->accepting_stream = nullptr;
return accepting;
@@ -791,7 +838,7 @@ static const char* write_state_name(grpc_chttp2_write_state st) {
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-static void set_write_state(grpc_chttp2_transport* t,
+static void set_write_state(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
grpc_chttp2_write_state st, const char* reason) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER",
@@ -799,100 +846,108 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(st), reason));
t->write_state = st;
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
- GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) {
grpc_error* err = t->close_transport_on_writes_finished;
t->close_transport_on_writes_finished = nullptr;
- close_transport_locked(t, err);
+ close_transport_locked(exec_ctx, t, err);
}
}
}
static void inc_initiate_write_reason(
- grpc_chttp2_initiate_write_reason reason) {
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_initiate_write_reason reason) {
switch (reason) {
case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED(
+ exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx);
break;
case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
- GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx);
break;
}
}
-void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
+void grpc_chttp2_initiate_write(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_initiate_write_reason reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
- inc_initiate_write_reason(reason);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ inc_initiate_write_reason(exec_ctx, reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
grpc_chttp2_initiate_write_reason_string(reason));
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
@@ -901,7 +956,8 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
+void grpc_chttp2_mark_stream_writable(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
if (t->closed_with_error == GRPC_ERROR_NONE &&
grpc_chttp2_list_add_writable_stream(t, s)) {
@@ -951,7 +1007,8 @@ static const char* begin_writing_desc(bool partial, bool inlined) {
GPR_UNREACHABLE_CODE(return "bad state tuple");
}
-static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
+static void write_action_begin_locked(grpc_exec_ctx* exec_ctx, void* gt,
+ grpc_error* error_ignored) {
GPR_TIMER_BEGIN("write_action_begin_locked", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
@@ -959,59 +1016,62 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
if (t->closed_with_error != GRPC_ERROR_NONE) {
r.writing = false;
} else {
- r = grpc_chttp2_begin_write(t);
+ r = grpc_chttp2_begin_write(exec_ctx, t);
}
if (r.writing) {
if (r.partial) {
- GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
+ GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx);
}
if (!t->is_first_write_in_batch) {
- GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
+ GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx);
}
grpc_closure_scheduler* scheduler =
write_scheduler(t, r.early_results_scheduled, r.partial);
if (scheduler != grpc_schedule_on_exec_ctx) {
- GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
+ GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx);
}
set_write_state(
- t,
+ exec_ctx, t,
r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
: GRPC_CHTTP2_WRITE_STATE_WRITING,
begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
GRPC_ERROR_NONE);
} else {
- GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
+ GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "begin writing nothing");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("write_action_begin_locked", 0);
}
-static void write_action(void* gt, grpc_error* error) {
+static void write_action(grpc_exec_ctx* exec_ctx, void* gt, grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
GPR_TIMER_BEGIN("write_action", 0);
grpc_endpoint_write(
- t->ep, &t->outbuf,
+ exec_ctx, t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner)));
GPR_TIMER_END("write_action", 0);
}
-static void write_action_end_locked(void* tp, grpc_error* error) {
+static void write_action_end_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
if (error != GRPC_ERROR_NONE) {
- close_transport_locked(t, GRPC_ERROR_REF(error));
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
close_transport_locked(
- t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
+ exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
}
}
@@ -1020,14 +1080,17 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "finish writing");
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ "continue writing");
t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_RUN(
+ exec_ctx,
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
grpc_combiner_finally_scheduler(t->combiner)),
@@ -1035,15 +1098,16 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
break;
}
- grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
+ grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
// If the change needs to occur immediately, manually initiate a write.
-static void queue_setting_update(grpc_chttp2_transport* t,
+static void queue_setting_update(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters* sp =
&grpc_chttp2_settings_parameters[id];
@@ -1058,7 +1122,8 @@ static void queue_setting_update(grpc_chttp2_transport* t,
}
}
-void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
+void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
uint32_t goaway_error,
grpc_slice goaway_text) {
// GRPC_CHTTP2_IF_TRACING(
@@ -1093,11 +1158,12 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
/* lie: use transient failure from the transport to indicate goaway has been
* received */
- connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(t->goaway_error), "got_goaway");
}
-static void maybe_start_some_streams(grpc_chttp2_transport* t) {
+static void maybe_start_some_streams(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
grpc_chttp2_stream* s;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
@@ -1117,21 +1183,22 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
connectivity_state_set(
- t, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
"no_more_stream_ids");
}
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
- post_destructive_reclaimer(t);
- grpc_chttp2_mark_stream_writable(t, s);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
+ post_destructive_reclaimer(exec_ctx, t);
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
grpc_chttp2_cancel_stream(
- t, s,
+ exec_ctx, t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
@@ -1153,13 +1220,15 @@ static grpc_closure* add_closure_barrier(grpc_closure* closure) {
return closure;
}
-static void null_then_run_closure(grpc_closure** closure, grpc_error* error) {
+static void null_then_run_closure(grpc_exec_ctx* exec_ctx,
+ grpc_closure** closure, grpc_error* error) {
grpc_closure* c = *closure;
*closure = nullptr;
- GRPC_CLOSURE_RUN(c, error);
+ GRPC_CLOSURE_RUN(exec_ctx, c, error);
}
-void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
+void grpc_chttp2_complete_closure_step(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s,
grpc_closure** pclosure,
grpc_error* error, const char* desc) {
@@ -1199,7 +1268,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
}
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
- GRPC_CLOSURE_RUN(closure, closure->error_data.error);
+ GRPC_CLOSURE_RUN(exec_ctx, closure, closure->error_data.error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
@@ -1215,24 +1284,28 @@ static bool contains_non_ok_status(grpc_metadata_batch* batch) {
return false;
}
-static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
+static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_mark_stream_writable(t, s);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
}
}
-static void add_fetched_slice_locked(grpc_chttp2_transport* t,
+static void add_fetched_slice_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
s->fetched_send_message_length +=
(uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
- maybe_become_writable_due_to_send_msg(t, s);
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
-static void continue_fetching_send_locked(grpc_chttp2_transport* t,
+static void continue_fetching_send_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
for (;;) {
if (s->fetching_send_message == nullptr) {
@@ -1241,11 +1314,11 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
return; /* early out */
}
if (s->fetched_send_message_length == s->fetching_send_message->length) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
- t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
+ exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
"fetching_send_message_finished");
} else {
grpc_chttp2_write_cb* cb = t->write_cb_pool;
@@ -1266,37 +1339,39 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
}
s->fetching_send_message = nullptr;
return; /* early out */
- } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX,
- &s->complete_fetch_locked)) {
- grpc_error* error =
- grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
+ UINT32_MAX, &s->complete_fetch_locked)) {
+ grpc_error* error = grpc_byte_stream_pull(
+ exec_ctx, s->fetching_send_message, &s->fetching_slice);
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
- grpc_chttp2_cancel_stream(t, s, error);
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
} else {
- add_fetched_slice_locked(t, s);
+ add_fetched_slice_locked(exec_ctx, t, s);
}
}
}
}
-static void complete_fetch_locked(void* gs, grpc_error* error) {
+static void complete_fetch_locked(grpc_exec_ctx* exec_ctx, void* gs,
+ grpc_error* error) {
grpc_chttp2_stream* s = (grpc_chttp2_stream*)gs;
grpc_chttp2_transport* t = s->t;
if (error == GRPC_ERROR_NONE) {
- error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
if (error == GRPC_ERROR_NONE) {
- add_fetched_slice_locked(t, s);
- continue_fetching_send_locked(t, s);
+ add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
}
}
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
- grpc_chttp2_cancel_stream(t, s, error);
+ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
}
}
-static void do_nothing(void* arg, grpc_error* error) {}
+static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
bool is_client, bool is_initial) {
@@ -1311,7 +1386,7 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
}
}
-static void perform_stream_op_locked(void* stream_op,
+static void perform_stream_op_locked(grpc_exec_ctx* exec_ctx, void* stream_op,
grpc_error* error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -1321,7 +1396,7 @@ static void perform_stream_op_locked(void* stream_op,
grpc_transport_stream_op_batch_payload* op_payload = op->payload;
grpc_chttp2_transport* t = s->t;
- GRPC_STATS_INC_HTTP2_OP_BATCHES();
+ GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx);
if (grpc_http_trace.enabled()) {
char* str = grpc_transport_stream_op_batch_string(op);
@@ -1356,12 +1431,13 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->cancel_stream) {
- GRPC_STATS_INC_HTTP2_OP_CANCEL();
- grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
+ GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx);
+ grpc_chttp2_cancel_stream(exec_ctx, t, s,
+ op_payload->cancel_stream.cancel_error);
}
if (op->send_initial_metadata) {
- GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
+ GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(exec_ctx);
GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
@@ -1389,7 +1465,7 @@ static void perform_stream_op_locked(void* stream_op,
}
if (metadata_size > metadata_peer_limit) {
grpc_chttp2_cancel_stream(
- t, s,
+ exec_ctx, t, s,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -1408,10 +1484,10 @@ static void perform_stream_op_locked(void* stream_op,
if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
- maybe_start_some_streams(t);
+ maybe_start_some_streams(exec_ctx, t);
} else {
grpc_chttp2_cancel_stream(
- t, s,
+ exec_ctx, t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Transport closed", &t->closed_with_error, 1),
@@ -1419,18 +1495,18 @@ static void perform_stream_op_locked(void* stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
if (!(op->send_message &&
(op->payload->send_message.send_message->flags &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
- t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
}
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
- t, s, &s->send_initial_metadata_finished,
+ exec_ctx, t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
@@ -1444,9 +1520,9 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->send_message) {
- GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
+ GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(exec_ctx);
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
- op->payload->send_message.send_message->length);
+ exec_ctx, op->payload->send_message.send_message->length);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1456,7 +1532,7 @@ static void perform_stream_op_locked(void* stream_op,
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
grpc_chttp2_complete_closure_step(
- t, s, &s->fetching_send_message_finished,
+ exec_ctx, t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -1485,13 +1561,13 @@ static void perform_stream_op_locked(void* stream_op,
} else {
s->write_buffering = false;
}
- continue_fetching_send_locked(t, s);
- maybe_become_writable_due_to_send_msg(t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
}
if (op->send_trailing_metadata) {
- GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
+ GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(exec_ctx);
GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
@@ -1505,7 +1581,7 @@ static void perform_stream_op_locked(void* stream_op,
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
grpc_chttp2_cancel_stream(
- t, s,
+ exec_ctx, t, s,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -1522,7 +1598,7 @@ static void perform_stream_op_locked(void* stream_op,
if (s->write_closed) {
s->send_trailing_metadata = nullptr;
grpc_chttp2_complete_closure_step(
- t, s, &s->send_trailing_metadata_finished,
+ exec_ctx, t, s, &s->send_trailing_metadata_finished,
grpc_metadata_batch_is_empty(
op->payload->send_trailing_metadata.send_trailing_metadata)
? GRPC_ERROR_NONE
@@ -1533,15 +1609,15 @@ static void perform_stream_op_locked(void* stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
- t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
}
}
if (op->recv_initial_metadata) {
- GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
+ GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(exec_ctx);
GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
s->recv_initial_metadata_ready =
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
@@ -1553,11 +1629,11 @@ static void perform_stream_op_locked(void* stream_op,
gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
(gpr_atm)gpr_strdup(t->peer_string));
}
- grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
if (op->recv_message) {
- GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
+ GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(exec_ctx);
size_t already_received;
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(!s->pending_byte_stream);
@@ -1568,30 +1644,32 @@ static void perform_stream_op_locked(void* stream_op,
already_received = s->frame_storage.length;
s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
already_received);
- grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx,
+ s->flow_control->MakeAction(), t, s);
}
}
- grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
if (op->recv_trailing_metadata) {
- GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
+ GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx);
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
- grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
- "op->on_complete");
+ grpc_chttp2_complete_closure_step(exec_ctx, t, s, &on_complete,
+ GRPC_ERROR_NONE, "op->on_complete");
GPR_TIMER_END("perform_stream_op_locked", 0);
- GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "perform_stream_op");
}
-static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
+static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
@@ -1619,29 +1697,32 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
op->handler_private.extra_arg = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
-static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
+static void cancel_pings(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_error* error) {
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
grpc_chttp2_ping_queue* pq = &t->ping_queue;
GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
- GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]);
}
GRPC_ERROR_UNREF(error);
}
-static void send_ping_locked(grpc_chttp2_transport* t,
+static void send_ping_locked(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack) {
if (t->closed_with_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
- GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_initiate,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error));
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
@@ -1651,15 +1732,18 @@ static void send_ping_locked(grpc_chttp2_transport* t,
GRPC_ERROR_NONE);
}
-static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
+static void retry_initiate_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
t->ping_state.is_delayed_ping_timer_set = false;
if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
}
-void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
+void grpc_chttp2_ack_ping(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ uint64_t id) {
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (pq->inflight_id != id) {
char* from = grpc_endpoint_get_peer(t->ep);
@@ -1667,48 +1751,54 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
gpr_free(from);
return;
}
- GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}
}
-static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
+static void send_goaway(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_error* error) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
grpc_http2_error_code http_error;
grpc_slice slice;
- grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
- &http_error, nullptr);
+ grpc_error_get_status(exec_ctx, error, GRPC_MILLIS_INF_FUTURE, nullptr,
+ &slice, &http_error, nullptr);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
GRPC_ERROR_UNREF(error);
}
-void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
+void grpc_chttp2_add_ping_strike(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
t->ping_recv_state.ping_strikes++;
if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
t->ping_policy.max_ping_strikes != 0) {
- send_goaway(t,
+ send_goaway(exec_ctx, t,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */
close_transport_locked(
- t, grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ exec_ctx, t,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
-static void perform_transport_op_locked(void* stream_op,
+static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
+ void* stream_op,
grpc_error* error_ignored) {
grpc_transport_op* op = (grpc_transport_op*)stream_op;
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)op->handler_private.extra_arg;
if (op->goaway_error) {
- send_goaway(t, op->goaway_error);
+ send_goaway(exec_ctx, t, op->goaway_error);
}
if (op->set_accept_stream) {
@@ -1718,40 +1808,43 @@ static void perform_transport_op_locked(void* stream_op,
}
if (op->bind_pollset) {
- grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
}
if (op->bind_pollset_set) {
- grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
}
if (op->send_ping) {
- send_ping_locked(t, nullptr, op->send_ping);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
+ send_ping_locked(exec_ctx, t, nullptr, op->send_ping);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
}
if (op->on_connectivity_state_change != nullptr) {
grpc_connectivity_state_notify_on_state_change(
- &t->channel_callback.state_tracker, op->connectivity_state,
+ exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
op->on_connectivity_state_change);
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
- close_transport_locked(t, op->disconnect_with_error);
+ close_transport_locked(exec_ctx, t, op->disconnect_with_error);
}
- GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op");
}
-static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
+static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_transport_op* op) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
char* msg = grpc_transport_op_string(op);
gpr_free(msg);
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
@@ -1761,33 +1854,36 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
* INPUT PROCESSING - GENERAL
*/
-void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
+void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
if (s->recv_initial_metadata_ready != nullptr &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
- grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
- s->recv_initial_metadata);
- null_then_run_closure(&s->recv_initial_metadata_ready, GRPC_ERROR_NONE);
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ exec_ctx, &s->metadata_buffer[0], s->recv_initial_metadata);
+ null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
}
-void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
+void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
grpc_error* error = GRPC_ERROR_NONE;
if (s->recv_message_ready != nullptr) {
*s->recv_message = nullptr;
if (s->final_metadata_requested && s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
if (!s->pending_byte_stream) {
@@ -1814,9 +1910,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
&s->decompressed_data_buffer, nullptr,
GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
&end_of_context)) {
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Stream decompression error.");
} else {
@@ -1825,8 +1922,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
s->decompressed_header_bytes = 0;
}
error = grpc_deframe_unprocessed_incoming_frames(
- &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
- s->recv_message);
+ exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer,
+ nullptr, s->recv_message);
if (end_of_context) {
grpc_stream_compression_context_destroy(
s->stream_decompression_ctx);
@@ -1835,14 +1932,15 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
}
} else {
error = grpc_deframe_unprocessed_incoming_frames(
- &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
- nullptr, s->recv_message);
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, nullptr, s->recv_message);
}
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
break;
} else if (*s->recv_message != nullptr) {
break;
@@ -1850,25 +1948,26 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
}
}
if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
- null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
+ null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = nullptr;
- null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
+ null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
GRPC_ERROR_UNREF(error);
}
}
-void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
+void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
- grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
bool pending_data = s->pending_byte_stream ||
@@ -1886,9 +1985,9 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
s->stream_decompression_ctx, &s->frame_storage,
&s->unprocessed_incoming_frames_buffer, nullptr,
GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
- grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
- &s->unprocessed_incoming_frames_buffer);
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
s->seen_error = true;
} else {
if (s->unprocessed_incoming_frames_buffer.length > 0) {
@@ -1903,23 +2002,23 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
- grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
- s->recv_trailing_metadata);
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
- t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
+ exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
"recv_trailing_metadata_finished");
}
}
}
-static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
- grpc_error* error) {
+static void remove_stream(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ uint32_t id, grpc_error* error) {
grpc_chttp2_stream* s =
(grpc_chttp2_stream*)grpc_chttp2_stream_map_delete(&t->stream_map, id);
GPR_ASSERT(s);
if (t->incoming_stream == s) {
t->incoming_stream = nullptr;
- grpc_chttp2_parsing_become_skip_parser(t);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
if (s->pending_byte_stream) {
if (s->on_next != nullptr) {
@@ -1927,8 +2026,8 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
- incoming_byte_stream_publish_error(bs, error);
- incoming_byte_stream_unref(bs);
+ incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ incoming_byte_stream_unref(exec_ctx, bs);
s->data_parser.parsing_frame = nullptr;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
@@ -1937,52 +2036,56 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
- post_benign_reclaimer(t);
+ post_benign_reclaimer(exec_ctx, t);
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
close_transport_locked(
- t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Last stream closed after sending GOAWAY", &error, 1));
+ exec_ctx, t,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Last stream closed after sending GOAWAY", &error, 1));
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
- GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
}
GRPC_ERROR_UNREF(error);
- maybe_start_some_streams(t);
+ maybe_start_some_streams(exec_ctx, t);
}
-void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+void grpc_chttp2_cancel_stream(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error* due_to_error) {
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error)) {
- close_from_api(t, s, due_to_error);
+ close_from_api(exec_ctx, t, s, due_to_error);
return;
}
if (!s->read_closed || !s->write_closed) {
if (s->id != 0) {
grpc_http2_error_code http_error;
- grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
- &http_error, nullptr);
+ grpc_error_get_status(exec_ctx, due_to_error, s->deadline, nullptr,
+ nullptr, &http_error, nullptr);
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
s->seen_error = true;
}
- grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
+ grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error);
}
-void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
- grpc_error* error) {
+void grpc_chttp2_fake_status(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, grpc_error* error) {
grpc_status_code status;
grpc_slice slice;
- grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
+ grpc_error_get_status(exec_ctx, error, s->deadline, &status, &slice, nullptr,
+ nullptr);
if (status != GRPC_STATUS_OK) {
s->seen_error = true;
}
@@ -1998,20 +2101,20 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
gpr_ltoa(status, status_string);
GRPC_LOG_IF_ERROR("add_status",
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- &s->metadata_buffer[1],
+ exec_ctx, &s->metadata_buffer[1],
grpc_mdelem_from_slices(
- GRPC_MDSTR_GRPC_STATUS,
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
grpc_slice_from_copied_string(status_string))));
if (!GRPC_SLICE_IS_EMPTY(slice)) {
GRPC_LOG_IF_ERROR(
"add_status_message",
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- &s->metadata_buffer[1],
- grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE,
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(slice))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
GRPC_ERROR_UNREF(error);
@@ -2044,12 +2147,14 @@ static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
return error;
}
-static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
- grpc_chttp2_write_cb** list, grpc_error* error) {
+static void flush_write_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, grpc_chttp2_write_cb** list,
+ grpc_error* error) {
while (*list) {
grpc_chttp2_write_cb* cb = *list;
*list = cb->next;
- grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
+ grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
+ GRPC_ERROR_REF(error),
"on_write_finished_cb");
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
@@ -2057,34 +2162,37 @@ static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
GRPC_ERROR_UNREF(error);
}
-void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s, grpc_error* error) {
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = nullptr;
- grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
- GRPC_ERROR_REF(error),
- "send_initial_metadata_finished");
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
+ "send_initial_metadata_finished");
s->send_trailing_metadata = nullptr;
- grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
- GRPC_ERROR_REF(error),
- "send_trailing_metadata_finished");
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->send_trailing_metadata_finished,
+ GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
s->fetching_send_message = nullptr;
- grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
- GRPC_ERROR_REF(error),
- "fetching_send_message_finished");
- flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
- flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
+ "fetching_send_message_finished");
+ flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs,
+ GRPC_ERROR_REF(error));
+ flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error);
}
-void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
+void grpc_chttp2_mark_stream_closed(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error* error) {
if (s->read_closed && s->write_closed) {
/* already closed */
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
GRPC_ERROR_UNREF(error);
return;
}
@@ -2098,20 +2206,20 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true;
- grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
+ grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
}
if (s->read_closed && s->write_closed) {
became_closed = true;
grpc_error* overall_error =
removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
if (s->id != 0) {
- remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
+ remove_stream(exec_ctx, t, s->id, GRPC_ERROR_REF(overall_error));
} else {
/* Purge streams waiting on concurrency still waiting for id assignment */
grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
}
if (overall_error != GRPC_ERROR_NONE) {
- grpc_chttp2_fake_status(t, s, overall_error);
+ grpc_chttp2_fake_status(exec_ctx, t, s, overall_error);
}
}
if (closed_read) {
@@ -2120,18 +2228,18 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
- grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
- grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
if (became_closed) {
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
- GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
GRPC_ERROR_UNREF(error);
}
-static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
- grpc_error* error) {
+static void close_from_api(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_chttp2_stream* s, grpc_error* error) {
grpc_slice hdr;
grpc_slice status_hdr;
grpc_slice http_status_hdr;
@@ -2141,8 +2249,8 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
uint32_t len = 0;
grpc_status_code grpc_status;
grpc_slice slice;
- grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
- nullptr);
+ grpc_error_get_status(exec_ctx, error, s->deadline, &grpc_status, &slice,
+ nullptr, nullptr);
GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
@@ -2284,11 +2392,13 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
- grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
+ grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
}
typedef struct {
+ grpc_exec_ctx* exec_ctx;
grpc_error* error;
grpc_chttp2_transport* t;
} cancel_stream_cb_args;
@@ -2296,11 +2406,13 @@ typedef struct {
static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
cancel_stream_cb_args* args = (cancel_stream_cb_args*)user_data;
grpc_chttp2_stream* s = (grpc_chttp2_stream*)stream;
- grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
+ grpc_chttp2_cancel_stream(args->exec_ctx, args->t, s,
+ GRPC_ERROR_REF(args->error));
}
-static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
- cancel_stream_cb_args args = {error, t};
+static void end_all_the_calls(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
+ grpc_error* error) {
+ cancel_stream_cb_args args = {exec_ctx, error, t};
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
GRPC_ERROR_UNREF(error);
}
@@ -2310,14 +2422,14 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
*/
template <class F>
-static void WithUrgency(grpc_chttp2_transport* t,
+static void WithUrgency(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
grpc_core::chttp2::FlowControlAction::Urgency urgency,
grpc_chttp2_initiate_write_reason reason, F action) {
switch (urgency) {
case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
- grpc_chttp2_initiate_write(t, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
// fallthrough
case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
action();
@@ -2326,27 +2438,31 @@ static void WithUrgency(grpc_chttp2_transport* t,
}
void grpc_chttp2_act_on_flowctl_action(
- const grpc_core::chttp2::FlowControlAction& action,
+ grpc_exec_ctx* exec_ctx, const grpc_core::chttp2::FlowControlAction& action,
grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
- WithUrgency(t, action.send_stream_update(),
- GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
- [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
- WithUrgency(t, action.send_transport_update(),
+ WithUrgency(
+ exec_ctx, t, action.send_stream_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
+ WithUrgency(exec_ctx, t, action.send_transport_update(),
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
- WithUrgency(t, action.send_initial_window_update(),
- GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
- queue_setting_update(t,
+ WithUrgency(exec_ctx, t, action.send_initial_window_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
+ [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
action.initial_window_size());
});
- WithUrgency(t, action.send_max_frame_size_update(),
- GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
- queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- action.max_frame_size());
- });
+ WithUrgency(
+ exec_ctx, t, action.send_max_frame_size_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ action.max_frame_size());
+ });
}
-static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
+static grpc_error* try_http_parsing(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
grpc_http_parser parser;
size_t i = 0;
grpc_error* error = GRPC_ERROR_NONE;
@@ -2375,7 +2491,8 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
return error;
}
-static void read_action_locked(void* tp, grpc_error* error) {
+static void read_action_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
@@ -2399,10 +2516,11 @@ static void read_action_locked(void* tp, grpc_error* error) {
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
- errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
+ errors[1] =
+ grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
}
if (errors[1] != GRPC_ERROR_NONE) {
- errors[2] = try_http_parsing(t);
+ errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
@@ -2417,9 +2535,10 @@ static void read_action_locked(void* tp, grpc_error* error) {
if (t->initial_window_update > 0) {
grpc_chttp2_stream* s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
- grpc_chttp2_mark_stream_writable(t, s);
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
grpc_chttp2_initiate_write(
- t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
+ exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
t->initial_window_update = 0;
@@ -2440,21 +2559,22 @@ static void read_action_locked(void* tp, grpc_error* error) {
error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
}
- close_transport_locked(t, GRPC_ERROR_REF(error));
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
} else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
- grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer);
if (keep_reading) {
- grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked);
- grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t,
- nullptr);
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading");
+ grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
+ &t->read_action_locked);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(),
+ t, nullptr);
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
GPR_TIMER_END("post_reading_action_locked", 0);
@@ -2466,12 +2586,15 @@ static void read_action_locked(void* tp, grpc_error* error) {
// t is reffed prior to calling the first time, and once the callback chain
// that kicks off finishes, it's unreffed
-static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
+static void schedule_bdp_ping_locked(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
t->flow_control->bdp_estimator()->SchedulePing();
- send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+ send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
+ &t->finish_bdp_ping_locked);
}
-static void start_bdp_ping_locked(void* tp, grpc_error* error) {
+static void start_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
if (grpc_http_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string,
@@ -2479,39 +2602,42 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) {
}
/* Reset the keepalive ping timer */
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
- grpc_timer_cancel(&t->keepalive_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
t->flow_control->bdp_estimator()->StartPing();
}
-static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
+static void finish_bdp_ping_locked(grpc_exec_ctx* exec_ctx, void* tp,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
if (grpc_http_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string,
grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE) {
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
- grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
- grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
- nullptr);
+ grpc_millis next_ping =
+ t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
- grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
+ grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired_locked);
}
-static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx* exec_ctx,
+ void* tp, grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)tp;
GPR_ASSERT(t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = false;
if (error != GRPC_ERROR_NONE) {
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
return;
}
- schedule_bdp_ping_locked(t);
+ schedule_bdp_ping_locked(exec_ctx, t);
}
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
@@ -2572,7 +2698,8 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
}
}
-static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
+static void init_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
@@ -2582,55 +2709,59 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
- send_ping_locked(t, &t->start_keepalive_ping_locked,
+ send_ping_locked(exec_ctx, t, &t->start_keepalive_ping_locked,
&t->finish_keepalive_ping_locked);
- grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
- grpc_timer_init(&t->keepalive_ping_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
+ grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
} else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
- grpc_timer_init(&t->keepalive_ping_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
+ grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping");
}
-static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
+static void start_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
- grpc_timer_init(&t->keepalive_watchdog_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer,
+ grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->keepalive_watchdog_fired_locked);
}
-static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
+static void finish_keepalive_ping_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
- grpc_timer_cancel(&t->keepalive_watchdog_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
- grpc_timer_init(&t->keepalive_ping_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
+ grpc_exec_ctx_now(exec_ctx) + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
}
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");
}
-static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
+static void keepalive_watchdog_fired_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(
- t,
+ exec_ctx, t,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"keepalive watchdog timeout"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
@@ -2643,67 +2774,71 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
}
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog");
}
/*******************************************************************************
* CALLBACK LOOP
*/
-static void connectivity_state_set(grpc_chttp2_transport* t,
+static void connectivity_state_set(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
- grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error,
- reason);
+ grpc_connectivity_state_set(exec_ctx, &t->channel_callback.state_tracker,
+ state, error, reason);
}
/*******************************************************************************
* POLLSET STUFF
*/
-static void set_pollset(grpc_transport* gt, grpc_stream* gs,
- grpc_pollset* pollset) {
+static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_pollset* pollset) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
- grpc_endpoint_add_to_pollset(t->ep, pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
}
-static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
- grpc_pollset_set* pollset_set) {
+static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_pollset_set* pollset_set) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)gt;
- grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
}
/*******************************************************************************
* BYTE STREAM
*/
-static void reset_byte_stream(void* arg, grpc_error* error) {
+static void reset_byte_stream(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_stream* s = (grpc_chttp2_stream*)arg;
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_maybe_complete_recv_message(s->t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = nullptr;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
- grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
+ grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
s->byte_stream_error = GRPC_ERROR_REF(error);
}
}
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) {
+static void incoming_byte_stream_unref(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_incoming_byte_stream* bs) {
if (gpr_unref(&bs->refs)) {
gpr_free(bs);
}
}
-static void incoming_byte_stream_next_locked(void* argp,
+static void incoming_byte_stream_next_locked(grpc_exec_ctx* exec_ctx,
+ void* argp,
grpc_error* error_ignored) {
grpc_chttp2_incoming_byte_stream* bs =
(grpc_chttp2_incoming_byte_stream*)argp;
@@ -2714,29 +2849,30 @@ static void incoming_byte_stream_next_locked(void* argp,
if (!s->read_closed) {
s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
cur_length);
- grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(),
+ t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = nullptr;
}
} else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = nullptr;
}
} else {
@@ -2746,10 +2882,11 @@ static void incoming_byte_stream_next_locked(void* argp,
} else {
s->on_next = bs->next_action.on_complete;
}
- incoming_byte_stream_unref(bs);
+ incoming_byte_stream_unref(exec_ctx, bs);
}
-static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
+static bool incoming_byte_stream_next(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
@@ -2764,6 +2901,7 @@ static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&bs->next_action.closure,
incoming_byte_stream_next_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner)),
@@ -2773,7 +2911,8 @@ static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
}
}
-static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream,
+static grpc_error* incoming_byte_stream_pull(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_slice* slice) {
GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream* bs =
@@ -2809,28 +2948,31 @@ static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream,
}
}
error = grpc_deframe_unprocessed_incoming_frames(
- &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice,
- nullptr);
+ exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+ slice, nullptr);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
GPR_TIMER_END("incoming_byte_stream_pull", 0);
return GRPC_ERROR_NONE;
}
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx,
+ void* byte_stream,
grpc_error* error_ignored);
-static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) {
+static void incoming_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream) {
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream* bs =
(grpc_chttp2_incoming_byte_stream*)byte_stream;
GRPC_CLOSURE_SCHED(
+ exec_ctx,
GRPC_CLOSURE_INIT(&bs->destroy_action,
incoming_byte_stream_destroy_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner)),
@@ -2839,28 +2981,30 @@ static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) {
}
static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) {
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs,
+ grpc_error* error) {
grpc_chttp2_stream* s = bs->stream;
GPR_ASSERT(error != GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = nullptr;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
- grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error));
+ grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
+ GRPC_ERROR_REF(error));
}
grpc_error* grpc_chttp2_incoming_byte_stream_push(
- grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
- grpc_slice* slice_out) {
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs,
+ grpc_slice slice, grpc_slice* slice_out) {
grpc_chttp2_stream* s = bs->stream;
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
- grpc_slice_unref_internal(slice);
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ grpc_slice_unref_internal(exec_ctx, slice);
return error;
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
@@ -2872,8 +3016,8 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push(
}
grpc_error* grpc_chttp2_incoming_byte_stream_finished(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
- bool reset_on_error) {
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_incoming_byte_stream* bs,
+ grpc_error* error, bool reset_on_error) {
grpc_chttp2_stream* s = bs->stream;
if (error == GRPC_ERROR_NONE) {
@@ -2882,25 +3026,27 @@ grpc_error* grpc_chttp2_incoming_byte_stream_finished(
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
}
- incoming_byte_stream_unref(bs);
+ incoming_byte_stream_unref(exec_ctx, bs);
return error;
}
-static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream,
+static void incoming_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_chttp2_incoming_byte_stream* bs =
(grpc_chttp2_incoming_byte_stream*)byte_stream;
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- bs, error, true /* reset_on_error */));
+ exec_ctx, bs, error, true /* reset_on_error */));
}
static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
incoming_byte_stream_next, incoming_byte_stream_pull,
incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx* exec_ctx,
+ void* byte_stream,
grpc_error* error_ignored) {
grpc_chttp2_incoming_byte_stream* bs =
(grpc_chttp2_incoming_byte_stream*)byte_stream;
@@ -2908,15 +3054,15 @@ static void incoming_byte_stream_destroy_locked(void* byte_stream,
grpc_chttp2_transport* t = s->t;
GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
- incoming_byte_stream_unref(bs);
+ incoming_byte_stream_unref(exec_ctx, bs);
s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
- uint32_t flags) {
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ uint32_t frame_size, uint32_t flags) {
grpc_chttp2_incoming_byte_stream* incoming_byte_stream =
(grpc_chttp2_incoming_byte_stream*)gpr_malloc(
sizeof(*incoming_byte_stream));
@@ -2936,25 +3082,30 @@ grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
* RESOURCE QUOTAS
*/
-static void post_benign_reclaimer(grpc_chttp2_transport* t) {
+static void post_benign_reclaimer(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
if (!t->benign_reclaimer_registered) {
t->benign_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
- grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
false, &t->benign_reclaimer_locked);
}
}
-static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
+static void post_destructive_reclaimer(grpc_exec_ctx* exec_ctx,
+ grpc_chttp2_transport* t) {
if (!t->destructive_reclaimer_registered) {
t->destructive_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
- grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
+ grpc_resource_user_post_reclaimer(exec_ctx,
+ grpc_endpoint_get_resource_user(t->ep),
true, &t->destructive_reclaimer_locked);
}
}
-static void benign_reclaimer_locked(void* arg, grpc_error* error) {
+static void benign_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -2964,7 +3115,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
t->peer_string);
}
- send_goaway(t,
+ send_goaway(exec_ctx, t,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
@@ -2977,12 +3128,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
t->benign_reclaimer_registered = false;
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclamation(
- grpc_endpoint_get_resource_user(t->ep));
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
}
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer");
}
-static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
+static void destructive_reclaimer_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)arg;
size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
t->destructive_reclaimer_registered = false;
@@ -2994,7 +3146,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
s->id);
}
grpc_chttp2_cancel_stream(
- t, s,
+ exec_ctx, t, s,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_HTTP2_ENHANCE_YOUR_CALM));
@@ -3003,14 +3155,14 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
there are more streams left, we can immediately post a new
reclaimer in case the resource quota needs to free more
memory */
- post_destructive_reclaimer(t);
+ post_destructive_reclaimer(exec_ctx, t);
}
}
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclamation(
- grpc_endpoint_get_resource_user(t->ep));
+ exec_ctx, grpc_endpoint_get_resource_user(t->ep));
}
- GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer");
}
/*******************************************************************************
@@ -3064,7 +3216,8 @@ const char* grpc_chttp2_initiate_write_reason_string(
GPR_UNREACHABLE_CODE(return "unknown");
}
-static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
+static grpc_endpoint* chttp2_get_endpoint(grpc_exec_ctx* exec_ctx,
+ grpc_transport* t) {
return ((grpc_chttp2_transport*)t)->ep;
}
@@ -3082,16 +3235,17 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
grpc_transport* grpc_create_chttp2_transport(
- const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
+ grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
+ grpc_endpoint* ep, bool is_client) {
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport));
- init_transport(t, channel_args, ep, is_client);
+ init_transport(exec_ctx, t, channel_args, ep, is_client);
return &t->base;
}
void grpc_chttp2_transport_start_reading(
- grpc_transport* transport, grpc_slice_buffer* read_buffer,
- grpc_closure* notify_on_receive_settings) {
+ grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+ grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport;
GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
@@ -3100,5 +3254,5 @@ void grpc_chttp2_transport_start_reading(
gpr_free(read_buffer);
}
t->notify_on_receive_settings = notify_on_receive_settings;
- GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}