diff options
author | 2017-12-06 09:47:54 -0800 | |
---|---|---|
committer | 2017-12-06 09:47:54 -0800 | |
commit | 8cf1470a51ea276ca84825e7495d4ee24743540d (patch) | |
tree | 72385cc865094115bc08cb813201d48cb09840bb /src/core/ext/transport/chttp2/transport/writing.cc | |
parent | 1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff) |
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/writing.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.cc | 121 |
1 files changed, 58 insertions, 63 deletions
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 204b5a7708..3310f35f5f 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -33,17 +33,15 @@ static void add_to_write_list(grpc_chttp2_write_cb** list, *list = cb; } -static void finish_write_cb(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, - grpc_chttp2_stream* s, grpc_chttp2_write_cb* cb, - grpc_error* error) { - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, error, +static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s, + grpc_chttp2_write_cb* cb, grpc_error* error) { + grpc_chttp2_complete_closure_step(t, s, &cb->closure, error, "finish_write_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; } -static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx, - grpc_chttp2_transport* t) { +static void maybe_initiate_ping(grpc_chttp2_transport* t) { grpc_chttp2_ping_queue* pq = &t->ping_queue; if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { /* no ping needed: wait */ @@ -68,7 +66,7 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx, } return; } - grpc_millis now = grpc_exec_ctx_now(exec_ctx); + grpc_millis now = grpc_core::ExecCtx::Get()->Now(); grpc_millis next_allowed_ping = t->ping_state.last_ping_sent_time + t->ping_policy.min_sent_ping_interval_without_data; @@ -89,20 +87,20 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx, } if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; - grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer, - next_allowed_ping, &t->retry_initiate_ping_locked); + grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, + &t->retry_initiate_ping_locked); } return; } pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_ping_create(false, pq->inflight_id)); - GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx); + GRPC_STATS_INC_HTTP2_PINGS_SENT(); t->ping_state.last_ping_sent_time = now; if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) { gpr_log(GPR_DEBUG, "%s: Ping sent [%p]: %d/%d", @@ -114,10 +112,9 @@ static void maybe_initiate_ping(grpc_exec_ctx* exec_ctx, (t->ping_state.pings_before_data_required != 0); } -static bool update_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, - grpc_chttp2_stream* s, int64_t send_bytes, - grpc_chttp2_write_cb** list, int64_t* ctr, - grpc_error* error) { +static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, + int64_t send_bytes, grpc_chttp2_write_cb** list, + int64_t* ctr, grpc_error* error) { bool sched_any = false; grpc_chttp2_write_cb* cb = *list; *list = nullptr; @@ -126,7 +123,7 @@ static bool update_list(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, grpc_chttp2_write_cb* next = cb->next; if (cb->call_at_byte <= *ctr) { sched_any = true; - finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error)); + finish_write_cb(t, s, cb, GRPC_ERROR_REF(error)); } else { add_to_write_list(list, cb); } @@ -179,22 +176,22 @@ class StreamWriteContext; class WriteContext { public: - WriteContext(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t) : t_(t) { - GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx); + WriteContext(grpc_chttp2_transport* t) : t_(t) { + GRPC_STATS_INC_HTTP2_WRITES_BEGUN(); GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); } // TODO(ctiller): make this the destructor - void FlushStats(grpc_exec_ctx* exec_ctx) { + void FlushStats() { GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE( - exec_ctx, initial_metadata_writes_); - GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes_); + initial_metadata_writes_); + GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_); GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE( - exec_ctx, trailing_metadata_writes_); - GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, flow_control_writes_); + trailing_metadata_writes_); + GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_); } - void FlushSettings(grpc_exec_ctx* exec_ctx) { + void FlushSettings() { if (t_->dirtied_local_settings && !t_->sent_local_settings) { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_settings_create( @@ -204,17 +201,17 @@ class WriteContext { t_->force_send_settings = false; t_->dirtied_local_settings = false; t_->sent_local_settings = true; - GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx); + GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(); } } - void FlushQueuedBuffers(grpc_exec_ctx* exec_ctx) { + void FlushQueuedBuffers() { /* simple writes are queued to qbuf, and flushed here */ grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf); GPR_ASSERT(t_->qbuf.count == 0); } - void FlushWindowUpdates(grpc_exec_ctx* exec_ctx) { + void FlushWindowUpdates() { uint32_t transport_announce = t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0); if (transport_announce) { @@ -234,7 +231,7 @@ class WriteContext { t_->ping_ack_count = 0; } - void EnactHpackSettings(grpc_exec_ctx* exec_ctx) { + void EnactHpackSettings() { grpc_chttp2_hpack_compressor_set_max_table_size( &t_->hpack_compressor, t_->settings[GRPC_PEER_SETTINGS] @@ -374,8 +371,8 @@ class DataSendContext { bool is_last_frame() const { return is_last_frame_; } - void CallCallbacks(grpc_exec_ctx* exec_ctx) { - if (update_list(exec_ctx, t_, s_, + void CallCallbacks() { + if (update_list(t_, s_, (int64_t)(s_->sending_bytes - sending_bytes_before_), &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed, GRPC_ERROR_NONE)) { @@ -403,7 +400,7 @@ class StreamWriteContext { s->flow_control->announced_window_delta()))); } - void FlushInitialMetadata(grpc_exec_ctx* exec_ctx) { + void FlushInitialMetadata() { /* send initial metadata if it's available */ if (s_->sent_initial_metadata) return; if (s_->send_initial_metadata == nullptr) return; @@ -430,7 +427,7 @@ class StreamWriteContext { [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size &s_->stats.outgoing // stats }; - grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, nullptr, 0, + grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0, s_->send_initial_metadata, &hopt, &t_->outbuf); write_context_->ResetPingRecvClock(); write_context_->IncInitialMetadataWrites(); @@ -440,11 +437,11 @@ class StreamWriteContext { s_->sent_initial_metadata = true; write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step( - exec_ctx, t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE, + t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE, "send_initial_metadata_finished"); } - void FlushWindowUpdates(grpc_exec_ctx* exec_ctx) { + void FlushWindowUpdates() { /* send any window updates */ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); if (stream_announce == 0) return; @@ -456,7 +453,7 @@ class StreamWriteContext { write_context_->IncWindowUpdateWrites(); } - void FlushData(grpc_exec_ctx* exec_ctx) { + void FlushData() { if (!s_->sent_initial_metadata) return; if (s_->flow_controlled_buffer.length == 0 && @@ -488,9 +485,9 @@ class StreamWriteContext { } write_context_->ResetPingRecvClock(); if (data_send_context.is_last_frame()) { - SentLastFrame(exec_ctx); + SentLastFrame(); } - data_send_context.CallCallbacks(exec_ctx); + data_send_context.CallCallbacks(); stream_became_writable_ = true; if (s_->flow_controlled_buffer.length > 0 || s_->compressed_data_buffer.length > 0) { @@ -500,7 +497,7 @@ class StreamWriteContext { write_context_->IncMessageWrites(); } - void FlushTrailingMetadata(grpc_exec_ctx* exec_ctx) { + void FlushTrailingMetadata() { if (!s_->sent_initial_metadata) return; if (s_->send_trailing_metadata == nullptr) return; @@ -521,18 +518,18 @@ class StreamWriteContext { t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], &s_->stats.outgoing}; - grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, + grpc_chttp2_encode_header(&t_->hpack_compressor, extra_headers_for_trailing_metadata_, num_extra_headers_for_trailing_metadata_, s_->send_trailing_metadata, &hopt, &t_->outbuf); } write_context_->IncTrailingMetadataWrites(); write_context_->ResetPingRecvClock(); - SentLastFrame(exec_ctx); + SentLastFrame(); write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step( - exec_ctx, t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE, + t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE, "send_trailing_metadata_finished"); } @@ -556,7 +553,7 @@ class StreamWriteContext { } } - void SentLastFrame(grpc_exec_ctx* exec_ctx) { + void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; @@ -565,7 +562,7 @@ class StreamWriteContext { &t_->outbuf, grpc_chttp2_rst_stream_create( s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing)); } - grpc_chttp2_mark_stream_closed(exec_ctx, t_, s_, !t_->is_client, true, + grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true, GRPC_ERROR_NONE); } @@ -579,12 +576,12 @@ class StreamWriteContext { } // namespace grpc_chttp2_begin_write_result grpc_chttp2_begin_write( - grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t) { - WriteContext ctx(exec_ctx, t); - ctx.FlushSettings(exec_ctx); + grpc_chttp2_transport* t) { + WriteContext ctx(t); + ctx.FlushSettings(); ctx.FlushPingAcks(); - ctx.FlushQueuedBuffers(exec_ctx); - ctx.EnactHpackSettings(exec_ctx); + ctx.FlushQueuedBuffers(); + ctx.EnactHpackSettings(); if (t->flow_control->remote_window() > 0) { ctx.UpdateStreamsNoLongerStalled(); @@ -594,47 +591,45 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_stream* s = ctx.NextStream()) { StreamWriteContext stream_ctx(&ctx, s); - stream_ctx.FlushInitialMetadata(exec_ctx); - stream_ctx.FlushWindowUpdates(exec_ctx); - stream_ctx.FlushData(exec_ctx); - stream_ctx.FlushTrailingMetadata(exec_ctx); + stream_ctx.FlushInitialMetadata(); + stream_ctx.FlushWindowUpdates(); + stream_ctx.FlushData(); + stream_ctx.FlushTrailingMetadata(); if (stream_ctx.stream_became_writable()) { if (!grpc_chttp2_list_add_writing_stream(t, s)) { /* already in writing list: drop ref */ - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing"); } else { /* ref will be dropped at end of write */ } } else { - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write"); } } - ctx.FlushWindowUpdates(exec_ctx); + ctx.FlushWindowUpdates(); - maybe_initiate_ping(exec_ctx, t); + maybe_initiate_ping(t); GPR_TIMER_END("grpc_chttp2_begin_write", 0); return ctx.Result(); } -void grpc_chttp2_end_write(grpc_exec_ctx* exec_ctx, grpc_chttp2_transport* t, - grpc_error* error) { +void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) { GPR_TIMER_BEGIN("grpc_chttp2_end_write", 0); grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_writing_stream(t, &s)) { if (s->sending_bytes != 0) { - update_list(exec_ctx, t, s, (int64_t)s->sending_bytes, - &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, - GRPC_ERROR_REF(error)); + update_list(t, s, (int64_t)s->sending_bytes, &s->on_write_finished_cbs, + &s->flow_controlled_bytes_written, GRPC_ERROR_REF(error)); s->sending_bytes = 0; } - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end"); } - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf); + grpc_slice_buffer_reset_and_unref_internal(&t->outbuf); GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_chttp2_end_write", 0); } |