/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/transport/chttp2/transport/context_list.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include #include #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/http2_errors.h" static void add_to_write_list(grpc_chttp2_write_cb** list, grpc_chttp2_write_cb* cb) { cb->next = *list; *list = cb; } static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_chttp2_write_cb* cb, grpc_error* error) { grpc_chttp2_complete_closure_step(t, s, &cb->closure, error, "finish_write_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; } static void maybe_initiate_ping(grpc_chttp2_transport* t) { grpc_chttp2_ping_queue* pq = &t->ping_queue; if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { /* no ping needed: wait */ return; } if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* ping already in-flight: wait */ if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) { gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging", t->is_client ? "CLIENT" : "SERVER", t->peer_string); } return; } if (t->ping_state.pings_before_data_required == 0 && t->ping_policy.max_pings_without_data != 0) { /* need to receive something of substance before sending a ping again */ if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) { gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d", t->is_client ? "CLIENT" : "SERVER", t->peer_string, t->ping_state.pings_before_data_required, t->ping_policy.max_pings_without_data); } return; } grpc_millis now = grpc_core::ExecCtx::Get()->Now(); grpc_millis next_allowed_ping_interval = (t->keepalive_permit_without_calls == 0 && grpc_chttp2_stream_map_size(&t->stream_map) == 0) ? 7200 * GPR_MS_PER_SEC : t->ping_policy.min_sent_ping_interval_without_data; grpc_millis next_allowed_ping = t->ping_state.last_ping_sent_time + next_allowed_ping_interval; if (next_allowed_ping > now) { /* not enough elapsed time between successive pings */ if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) { gpr_log(GPR_INFO, "%s: Ping delayed [%p]: not enough time elapsed since last ping. " " Last ping %f: Next ping %f: Now %f", t->is_client ? "CLIENT" : "SERVER", t->peer_string, static_cast(t->ping_state.last_ping_sent_time), static_cast(next_allowed_ping), static_cast(now)); } if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, &t->retry_initiate_ping_locked); } return; } pq->inflight_id = t->ping_ctr; t->ping_ctr++; GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_ping_create(false, pq->inflight_id)); GRPC_STATS_INC_HTTP2_PINGS_SENT(); t->ping_state.last_ping_sent_time = now; if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) { gpr_log(GPR_INFO, "%s: Ping sent [%p]: %d/%d", t->is_client ? "CLIENT" : "SERVER", t->peer_string, t->ping_state.pings_before_data_required, t->ping_policy.max_pings_without_data); } t->ping_state.pings_before_data_required -= (t->ping_state.pings_before_data_required != 0); } static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, int64_t send_bytes, grpc_chttp2_write_cb** list, int64_t* ctr, grpc_error* error) { bool sched_any = false; grpc_chttp2_write_cb* cb = *list; *list = nullptr; *ctr += send_bytes; while (cb) { grpc_chttp2_write_cb* next = cb->next; if (cb->call_at_byte <= *ctr) { sched_any = true; finish_write_cb(t, s, cb, GRPC_ERROR_REF(error)); } else { add_to_write_list(list, cb); } cb = next; } GRPC_ERROR_UNREF(error); return sched_any; } static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s, const char* staller) { if (grpc_flowctl_trace.enabled()) { gpr_log( GPR_DEBUG, "%s:%p stream %d moved to stalled list by %s. This is FULLY expected " "to happen in a healthy program that is not seeing flow control stalls." " However, if you know that there are unwanted stalls, here is some " "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]", t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], t->flow_control->remote_window(), static_cast GPR_MAX( 0, s->flow_control->remote_window_delta() + (int64_t)t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), s->flow_control->remote_window_delta()); } } static bool stream_ref_if_not_destroyed(gpr_refcount* r) { gpr_atm count; do { count = gpr_atm_acq_load(&r->count); if (count == 0) return false; } while (!gpr_atm_rel_cas(&r->count, count, count + 1)); return true; } /* How many bytes would we like to put on the wire during a single syscall */ static uint32_t target_write_size(grpc_chttp2_transport* t) { return 1024 * 1024; } // Returns true if initial_metadata contains only default headers. static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) { return initial_metadata->list.default_count == initial_metadata->list.count; } namespace { class StreamWriteContext; class WriteContext { public: WriteContext(grpc_chttp2_transport* t) : t_(t) { GRPC_STATS_INC_HTTP2_WRITES_BEGUN(); GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0); } // TODO(ctiller): make this the destructor void FlushStats() { GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE( initial_metadata_writes_); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_); GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE( trailing_metadata_writes_); GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_); } void FlushSettings() { if (t_->dirtied_local_settings && !t_->sent_local_settings) { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_settings_create( t_->settings[GRPC_SENT_SETTINGS], t_->settings[GRPC_LOCAL_SETTINGS], t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); t_->force_send_settings = false; t_->dirtied_local_settings = false; t_->sent_local_settings = true; GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(); } } void FlushQueuedBuffers() { /* simple writes are queued to qbuf, and flushed here */ grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf); GPR_ASSERT(t_->qbuf.count == 0); } void FlushWindowUpdates() { uint32_t transport_announce = t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0); if (transport_announce) { grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce, &throwaway_stats)); ResetPingClock(); } } void FlushPingAcks() { for (size_t i = 0; i < t_->ping_ack_count; i++) { grpc_slice_buffer_add(&t_->outbuf, grpc_chttp2_ping_create(true, t_->ping_acks[i])); } t_->ping_ack_count = 0; } void EnactHpackSettings() { grpc_chttp2_hpack_compressor_set_max_table_size( &t_->hpack_compressor, t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); } void UpdateStreamsNoLongerStalled() { grpc_chttp2_stream* s; while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { if (t_->closed_with_error == GRPC_ERROR_NONE && grpc_chttp2_list_add_writable_stream(t_, s)) { if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { grpc_chttp2_list_remove_writable_stream(t_, s); } } } } grpc_chttp2_stream* NextStream() { if (t_->outbuf.length > target_write_size(t_)) { result_.partial = true; return nullptr; } grpc_chttp2_stream* s; if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) { return nullptr; } return s; } void ResetPingClock() { if (!t_->is_client) { t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t_->ping_recv_state.ping_strikes = 0; } t_->ping_state.pings_before_data_required = t_->ping_policy.max_pings_without_data; } void IncInitialMetadataWrites() { ++initial_metadata_writes_; } void IncWindowUpdateWrites() { ++flow_control_writes_; } void IncMessageWrites() { ++message_writes_; } void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; } void NoteScheduledResults() { result_.early_results_scheduled = true; } grpc_chttp2_transport* transport() const { return t_; } grpc_chttp2_begin_write_result Result() { result_.writing = t_->outbuf.count > 0; return result_; } private: grpc_chttp2_transport* const t_; /* stats histogram counters: we increment these throughout this function, and at the end publish to the central stats histograms */ int flow_control_writes_ = 0; int initial_metadata_writes_ = 0; int trailing_metadata_writes_ = 0; int message_writes_ = 0; grpc_chttp2_begin_write_result result_ = {false, false, false}; }; class DataSendContext { public: DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t, grpc_chttp2_stream* s) : write_context_(write_context), t_(t), s_(s), sending_bytes_before_(s_->sending_bytes) {} uint32_t stream_remote_window() const { return static_cast GPR_MAX( 0, s_->flow_control->remote_window_delta() + (int64_t)t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } uint32_t max_outgoing() const { return static_cast GPR_MIN( t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], GPR_MIN(stream_remote_window(), t_->flow_control->remote_window())); } bool AnyOutgoing() const { return max_outgoing() > 0; } void FlushCompressedBytes() { uint32_t send_bytes = static_cast GPR_MIN( max_outgoing(), s_->compressed_data_buffer.length); bool is_last_data_frame = (send_bytes == s_->compressed_data_buffer.length && s_->flow_controlled_buffer.length == 0 && s_->fetching_send_message == nullptr); if (is_last_data_frame && s_->send_trailing_metadata != nullptr && s_->stream_compression_ctx != nullptr) { if (GPR_UNLIKELY(!grpc_stream_compress( s_->stream_compression_ctx, &s_->flow_controlled_buffer, &s_->compressed_data_buffer, nullptr, MAX_SIZE_T, GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) { gpr_log(GPR_ERROR, "Stream compression failed."); } grpc_stream_compression_context_destroy(s_->stream_compression_ctx); s_->stream_compression_ctx = nullptr; /* After finish, bytes in s->compressed_data_buffer may be * more than max_outgoing. Start another round of the current * while loop so that send_bytes and is_last_data_frame are * recalculated. */ return; } is_last_frame_ = is_last_data_frame && s_->send_trailing_metadata != nullptr && grpc_metadata_batch_is_empty(s_->send_trailing_metadata); grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, is_last_frame_, &s_->stats.outgoing, &t_->outbuf); s_->flow_control->SentData(send_bytes); s_->byte_counter += send_bytes; if (s_->compressed_data_buffer.length == 0) { s_->sending_bytes += s_->uncompressed_data_size; } } void CompressMoreBytes() { if (s_->stream_compression_ctx == nullptr) { s_->stream_compression_ctx = grpc_stream_compression_context_create(s_->stream_compression_method); } s_->uncompressed_data_size = s_->flow_controlled_buffer.length; if (GPR_UNLIKELY(!grpc_stream_compress( s_->stream_compression_ctx, &s_->flow_controlled_buffer, &s_->compressed_data_buffer, nullptr, MAX_SIZE_T, GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) { gpr_log(GPR_ERROR, "Stream compression failed."); } } bool is_last_frame() const { return is_last_frame_; } void CallCallbacks() { if (update_list( t_, s_, static_cast(s_->sending_bytes - sending_bytes_before_), &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed, GRPC_ERROR_NONE)) { write_context_->NoteScheduledResults(); } } private: WriteContext* write_context_; grpc_chttp2_transport* t_; grpc_chttp2_stream* s_; const size_t sending_bytes_before_; bool is_last_frame_ = false; }; class StreamWriteContext { public: StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s) : write_context_(write_context), t_(write_context->transport()), s_(s) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_, t_->is_client ? "CLIENT" : "SERVER", s->id, s->sent_initial_metadata, s->send_initial_metadata != nullptr, (int)(s->flow_control->local_window_delta() - s->flow_control->announced_window_delta()))); } void FlushInitialMetadata() { /* send initial metadata if it's available */ if (s_->sent_initial_metadata) return; if (s_->send_initial_metadata == nullptr) return; // We skip this on the server side if there is no custom initial // metadata, there are no messages to send, and we are also sending // trailing metadata. This results in a Trailers-Only response, // which is required for retries, as per: // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid if (!t_->is_client && s_->fetching_send_message == nullptr && s_->flow_controlled_buffer.length == 0 && s_->compressed_data_buffer.length == 0 && s_->send_trailing_metadata != nullptr && is_default_initial_metadata(s_->send_initial_metadata)) { ConvertInitialMetadataToTrailingMetadata(); } else { grpc_encode_header_options hopt = { s_->id, // stream_id false, // is_eof t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0, // use_true_binary_metadata t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size &s_->stats.outgoing // stats }; grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0, s_->send_initial_metadata, &hopt, &t_->outbuf); write_context_->ResetPingClock(); write_context_->IncInitialMetadataWrites(); } s_->send_initial_metadata = nullptr; s_->sent_initial_metadata = true; write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step( t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE, "send_initial_metadata_finished"); } void FlushWindowUpdates() { /* send any window updates */ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); if (stream_announce == 0) return; grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce, &s_->stats.outgoing)); write_context_->ResetPingClock(); write_context_->IncWindowUpdateWrites(); } void FlushData() { if (!s_->sent_initial_metadata) return; if (s_->flow_controlled_buffer.length == 0 && s_->compressed_data_buffer.length == 0) { return; // early out: nothing to do } DataSendContext data_send_context(write_context_, t_, s_); if (!data_send_context.AnyOutgoing()) { if (t_->flow_control->remote_window() <= 0) { report_stall(t_, s_, "transport"); grpc_chttp2_list_add_stalled_by_transport(t_, s_); } else if (data_send_context.stream_remote_window() <= 0) { report_stall(t_, s_, "stream"); grpc_chttp2_list_add_stalled_by_stream(t_, s_); } return; // early out: nothing to do } while ((s_->flow_controlled_buffer.length > 0 || s_->compressed_data_buffer.length > 0) && data_send_context.max_outgoing() > 0) { if (s_->compressed_data_buffer.length > 0) { data_send_context.FlushCompressedBytes(); } else { data_send_context.CompressMoreBytes(); } } if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) { grpc_core::ContextList::Append(&t_->cl, s_); } write_context_->ResetPingClock(); if (data_send_context.is_last_frame()) { SentLastFrame(); } data_send_context.CallCallbacks(); stream_became_writable_ = true; if (s_->flow_controlled_buffer.length > 0 || s_->compressed_data_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t_, s_); } write_context_->IncMessageWrites(); } void FlushTrailingMetadata() { if (!s_->sent_initial_metadata) return; if (s_->send_trailing_metadata == nullptr) return; if (s_->fetching_send_message != nullptr) return; if (s_->flow_controlled_buffer.length != 0) return; if (s_->compressed_data_buffer.length != 0) return; GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) { grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true, &s_->stats.outgoing, &t_->outbuf); } else { grpc_encode_header_options hopt = { s_->id, true, t_->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0, t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], &s_->stats.outgoing}; grpc_chttp2_encode_header(&t_->hpack_compressor, extra_headers_for_trailing_metadata_, num_extra_headers_for_trailing_metadata_, s_->send_trailing_metadata, &hopt, &t_->outbuf); } write_context_->IncTrailingMetadataWrites(); write_context_->ResetPingClock(); SentLastFrame(); write_context_->NoteScheduledResults(); grpc_chttp2_complete_closure_step( t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE, "send_trailing_metadata_finished"); } bool stream_became_writable() { return stream_became_writable_; } private: void ConvertInitialMetadataToTrailingMetadata() { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)")); // When sending Trailers-Only, we need to move the :status and // content-type headers to the trailers. if (s_->send_initial_metadata->idx.named.status != nullptr) { extra_headers_for_trailing_metadata_ [num_extra_headers_for_trailing_metadata_++] = &s_->send_initial_metadata->idx.named.status->md; } if (s_->send_initial_metadata->idx.named.content_type != nullptr) { extra_headers_for_trailing_metadata_ [num_extra_headers_for_trailing_metadata_++] = &s_->send_initial_metadata->idx.named.content_type->md; } } void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_rst_stream_create( s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing)); } grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true, GRPC_ERROR_NONE); } WriteContext* const write_context_; grpc_chttp2_transport* const t_; grpc_chttp2_stream* const s_; bool stream_became_writable_ = false; grpc_mdelem* extra_headers_for_trailing_metadata_[2]; size_t num_extra_headers_for_trailing_metadata_ = 0; }; } // namespace grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_chttp2_transport* t) { WriteContext ctx(t); ctx.FlushSettings(); ctx.FlushPingAcks(); ctx.FlushQueuedBuffers(); ctx.EnactHpackSettings(); if (t->flow_control->remote_window() > 0) { ctx.UpdateStreamsNoLongerStalled(); } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_stream* s = ctx.NextStream()) { StreamWriteContext stream_ctx(&ctx, s); stream_ctx.FlushInitialMetadata(); stream_ctx.FlushWindowUpdates(); stream_ctx.FlushData(); stream_ctx.FlushTrailingMetadata(); if (stream_ctx.stream_became_writable()) { if (!grpc_chttp2_list_add_writing_stream(t, s)) { /* already in writing list: drop ref */ GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing"); } else { /* ref will be dropped at end of write */ } } else { GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write"); } } ctx.FlushWindowUpdates(); maybe_initiate_ping(t); return ctx.Result(); } void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) { GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0); grpc_chttp2_stream* s; if (t->channelz_socket != nullptr) { t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write); } t->num_messages_in_next_write = 0; while (grpc_chttp2_list_pop_writing_stream(t, &s)) { if (s->sending_bytes != 0) { update_list(t, s, static_cast(s->sending_bytes), &s->on_write_finished_cbs, &s->flow_controlled_bytes_written, GRPC_ERROR_REF(error)); s->sending_bytes = 0; } GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end"); } grpc_slice_buffer_reset_and_unref_internal(&t->outbuf); GRPC_ERROR_UNREF(error); }