aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/writing.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:54 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:54 -0800
commit8cf1470a51ea276ca84825e7495d4ee24743540d (patch)
tree72385cc865094115bc08cb813201d48cb09840bb /src/core/ext/transport/chttp2/transport/writing.cc
parent1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff)
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/writing.cc')
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc121
1 files changed, 58 insertions, 63 deletions
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 204b5a7708..3310f35f5f 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -33,17 +33,15 @@ static void add_to_write_list(grpc_chttp2_write_cb** list,
*list = cb;
}
-static void finish_write_cb(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
- grpc_chttp2_stream* s, grpc_chttp2_write_cb* cb,
- grpc_error* error) {
- grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, error,
+static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ grpc_chttp2_write_cb* cb, grpc_error* error) {
+ grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
"finish_write_cb");
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
}
-static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx,
- grpc_chttp2_transport* t) {
+static void maybe_initiate_ping(grpc_chttp2_transport* t) {
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
/* no ping needed: wait */
@@ -68,7 +66,7 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx,
}
return;
}
- grpc_millis now = grpc_exec_ctx_now(exec_ctx);
+ grpc_millis now = grpc_core::ExecCtx::Get()->Now();
grpc_millis next_allowed_ping =
t->ping_state.last_ping_sent_time +
t->ping_policy.min_sent_ping_interval_without_data;
@@ -89,20 +87,20 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx,
}
if (!t->ping_state.is_delayed_ping_timer_set) {
t->ping_state.is_delayed_ping_timer_set = true;
- grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer,
- next_allowed_ping, &t->retry_initiate_ping_locked);
+ grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
+ &t->retry_initiate_ping_locked);
}
return;
}
pq->inflight_id = t->ping_ctr;
t->ping_ctr++;
- GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
+ GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_ping_create(false, pq->inflight_id));
- GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx);
+ GRPC_STATS_INC_HTTP2_PINGS_SENT();
t->ping_state.last_ping_sent_time = now;
if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s: Ping sent [%p]: %d/%d",
@@ -114,10 +112,9 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx,
(t->ping_state.pings_before_data_required != 0);
}
-static bool update_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
- grpc_chttp2_stream* s, int64_t send_bytes,
- grpc_chttp2_write_cb** list, int64_t* ctr,
- grpc_error* error) {
+static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
+ int64_t send_bytes, grpc_chttp2_write_cb** list,
+ int64_t* ctr, grpc_error* error) {
bool sched_any = false;
grpc_chttp2_write_cb* cb = *list;
*list = nullptr;
@@ -126,7 +123,7 @@ static bool update_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
grpc_chttp2_write_cb* next = cb->next;
if (cb->call_at_byte <= *ctr) {
sched_any = true;
- finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
+ finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
} else {
add_to_write_list(list, cb);
}
@@ -179,22 +176,22 @@ class StreamWriteContext;
class WriteContext {
public:
- WriteContext(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t) : t_(t) {
- GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx);
+ WriteContext(grpc_chttp2_transport* t) : t_(t) {
+ GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0);
}
// TODO(ctiller): make this the destructor
- void FlushStats(grpc_exec_ctx* exec_ctx) {
+ void FlushStats() {
GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
- exec_ctx, initial_metadata_writes_);
- GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes_);
+ initial_metadata_writes_);
+ GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
- exec_ctx, trailing_metadata_writes_);
- GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, flow_control_writes_);
+ trailing_metadata_writes_);
+ GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
}
- void FlushSettings(grpc_exec_ctx* exec_ctx) {
+ void FlushSettings() {
if (t_->dirtied_local_settings && !t_->sent_local_settings) {
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_settings_create(
@@ -204,17 +201,17 @@ class WriteContext {
t_->force_send_settings = false;
t_->dirtied_local_settings = false;
t_->sent_local_settings = true;
- GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx);
+ GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
}
}
- void FlushQueuedBuffers(grpc_exec_ctx* exec_ctx) {
+ void FlushQueuedBuffers() {
/* simple writes are queued to qbuf, and flushed here */
grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
GPR_ASSERT(t_->qbuf.count == 0);
}
- void FlushWindowUpdates(grpc_exec_ctx* exec_ctx) {
+ void FlushWindowUpdates() {
uint32_t transport_announce =
t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
if (transport_announce) {
@@ -234,7 +231,7 @@ class WriteContext {
t_->ping_ack_count = 0;
}
- void EnactHpackSettings(grpc_exec_ctx* exec_ctx) {
+ void EnactHpackSettings() {
grpc_chttp2_hpack_compressor_set_max_table_size(
&t_->hpack_compressor,
t_->settings[GRPC_PEER_SETTINGS]
@@ -374,8 +371,8 @@ class DataSendContext {
bool is_last_frame() const { return is_last_frame_; }
- void CallCallbacks(grpc_exec_ctx* exec_ctx) {
- if (update_list(exec_ctx, t_, s_,
+ void CallCallbacks() {
+ if (update_list(t_, s_,
(int64_t)(s_->sending_bytes - sending_bytes_before_),
&s_->on_flow_controlled_cbs,
&s_->flow_controlled_bytes_flowed, GRPC_ERROR_NONE)) {
@@ -403,7 +400,7 @@ class StreamWriteContext {
s->flow_control->announced_window_delta())));
}
- void FlushInitialMetadata(grpc_exec_ctx* exec_ctx) {
+ void FlushInitialMetadata() {
/* send initial metadata if it's available */
if (s_->sent_initial_metadata) return;
if (s_->send_initial_metadata == nullptr) return;
@@ -430,7 +427,7 @@ class StreamWriteContext {
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
&s_->stats.outgoing // stats
};
- grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, nullptr, 0,
+ grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
s_->send_initial_metadata, &hopt, &t_->outbuf);
write_context_->ResetPingRecvClock();
write_context_->IncInitialMetadataWrites();
@@ -440,11 +437,11 @@ class StreamWriteContext {
s_->sent_initial_metadata = true;
write_context_->NoteScheduledResults();
grpc_chttp2_complete_closure_step(
- exec_ctx, t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
+ t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
"send_initial_metadata_finished");
}
- void FlushWindowUpdates(grpc_exec_ctx* exec_ctx) {
+ void FlushWindowUpdates() {
/* send any window updates */
const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
if (stream_announce == 0) return;
@@ -456,7 +453,7 @@ class StreamWriteContext {
write_context_->IncWindowUpdateWrites();
}
- void FlushData(grpc_exec_ctx* exec_ctx) {
+ void FlushData() {
if (!s_->sent_initial_metadata) return;
if (s_->flow_controlled_buffer.length == 0 &&
@@ -488,9 +485,9 @@ class StreamWriteContext {
}
write_context_->ResetPingRecvClock();
if (data_send_context.is_last_frame()) {
- SentLastFrame(exec_ctx);
+ SentLastFrame();
}
- data_send_context.CallCallbacks(exec_ctx);
+ data_send_context.CallCallbacks();
stream_became_writable_ = true;
if (s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) {
@@ -500,7 +497,7 @@ class StreamWriteContext {
write_context_->IncMessageWrites();
}
- void FlushTrailingMetadata(grpc_exec_ctx* exec_ctx) {
+ void FlushTrailingMetadata() {
if (!s_->sent_initial_metadata) return;
if (s_->send_trailing_metadata == nullptr) return;
@@ -521,18 +518,18 @@ class StreamWriteContext {
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s_->stats.outgoing};
- grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor,
+ grpc_chttp2_encode_header(&t_->hpack_compressor,
extra_headers_for_trailing_metadata_,
num_extra_headers_for_trailing_metadata_,
s_->send_trailing_metadata, &hopt, &t_->outbuf);
}
write_context_->IncTrailingMetadataWrites();
write_context_->ResetPingRecvClock();
- SentLastFrame(exec_ctx);
+ SentLastFrame();
write_context_->NoteScheduledResults();
grpc_chttp2_complete_closure_step(
- exec_ctx, t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
+ t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
"send_trailing_metadata_finished");
}
@@ -556,7 +553,7 @@ class StreamWriteContext {
}
}
- void SentLastFrame(grpc_exec_ctx* exec_ctx) {
+ void SentLastFrame() {
s_->send_trailing_metadata = nullptr;
s_->sent_trailing_metadata = true;
@@ -565,7 +562,7 @@ class StreamWriteContext {
&t_->outbuf, grpc_chttp2_rst_stream_create(
s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
}
- grpc_chttp2_mark_stream_closed(exec_ctx, t_, s_, !t_->is_client, true,
+ grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
GRPC_ERROR_NONE);
}
@@ -579,12 +576,12 @@ class StreamWriteContext {
} // namespace
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t) {
- WriteContext ctx(exec_ctx, t);
- ctx.FlushSettings(exec_ctx);
+ grpc_chttp2_transport* t) {
+ WriteContext ctx(t);
+ ctx.FlushSettings();
ctx.FlushPingAcks();
- ctx.FlushQueuedBuffers(exec_ctx);
- ctx.EnactHpackSettings(exec_ctx);
+ ctx.FlushQueuedBuffers();
+ ctx.EnactHpackSettings();
if (t->flow_control->remote_window() > 0) {
ctx.UpdateStreamsNoLongerStalled();
@@ -594,47 +591,45 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_stream* s = ctx.NextStream()) {
StreamWriteContext stream_ctx(&ctx, s);
- stream_ctx.FlushInitialMetadata(exec_ctx);
- stream_ctx.FlushWindowUpdates(exec_ctx);
- stream_ctx.FlushData(exec_ctx);
- stream_ctx.FlushTrailingMetadata(exec_ctx);
+ stream_ctx.FlushInitialMetadata();
+ stream_ctx.FlushWindowUpdates();
+ stream_ctx.FlushData();
+ stream_ctx.FlushTrailingMetadata();
if (stream_ctx.stream_became_writable()) {
if (!grpc_chttp2_list_add_writing_stream(t, s)) {
/* already in writing list: drop ref */
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
} else {
/* ref will be dropped at end of write */
}
} else {
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
}
}
- ctx.FlushWindowUpdates(exec_ctx);
+ ctx.FlushWindowUpdates();
- maybe_initiate_ping(exec_ctx, t);
+ maybe_initiate_ping(t);
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
return ctx.Result();
}
-void grpc_chttp2_end_write(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t,
- grpc_error* error) {
+void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
GPR_TIMER_BEGIN("grpc_chttp2_end_write", 0);
grpc_chttp2_stream* s;
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
if (s->sending_bytes != 0) {
- update_list(exec_ctx, t, s, (int64_t)s->sending_bytes,
- &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
- GRPC_ERROR_REF(error));
+ update_list(t, s, (int64_t)s->sending_bytes, &s->on_write_finished_cbs,
+ &s->flow_controlled_bytes_written, GRPC_ERROR_REF(error));
s->sending_bytes = 0;
}
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
+ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
}
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf);
+ grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_chttp2_end_write", 0);
}