aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2')
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc27
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc17
4 files changed, 38 insertions, 10 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index dfed824cd5..5bdcb387c9 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client", false), args, "fd-client");
+ grpc_fd_create(fd, "client", true), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index a0228785ee..e4bd91d07b 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint* server_endpoint =
- grpc_tcp_create(grpc_fd_create(fd, name, false),
+ grpc_tcp_create(grpc_fd_create(fd, name, true),
grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index bc6fa0d0eb..027a57d606 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -812,6 +812,12 @@ static void set_write_state(grpc_chttp2_transport* t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
+ /* If the state is being reset back to idle, it means a write was just
+ * finished. Make sure all the run_after_write closures are scheduled.
+ *
+ * This is also our chance to close the transport if the transport was marked
+ * to be closed after all writes finish (for example, if we received a go-away
+ * from peer while we had some pending writes) */
if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
if (t->close_transport_on_writes_finished != nullptr) {
@@ -899,6 +905,22 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
+ /* Note that the 'write_action_begin_locked' closure is being scheduled
+ * on the 'finally_scheduler' of t->combiner. This means that
+ * 'write_action_begin_locked' is called only *after* all the other
+ * closures (some of which are potentially initiating more writes on the
+ * transport) are executed on the t->combiner.
+ *
+ * The reason for scheduling on finally_scheduler is to make sure we batch
+ * as many writes as possible. 'write_action_begin_locked' is the function
+ * that gathers all the relevant bytes (which are at various places in the
+ * grpc_chttp2_transport structure) and append them to 'outbuf' field in
+ * grpc_chttp2_transport thereby batching what would have been potentially
+ * multiple write operations.
+ *
+ * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
+ * It does not call the endpoint to write the bytes. That is done by the
+ * 'write_action' (which is scheduled by 'write_action_begin_locked') */
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
@@ -1007,9 +1029,12 @@ static void write_action(void* gt, grpc_error* error) {
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner)));
+ grpc_combiner_scheduler(t->combiner)),
+ nullptr);
}
+/* Callback from the grpc_endpoint after bytes have been written by calling
+ * sendmsg */
static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index e89c363200..53932bcb7f 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -40,6 +40,7 @@ namespace chttp2 {
namespace {
static constexpr const int kTracePadding = 30;
+static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str;
@@ -55,7 +56,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
char* str;
- if (new_val > 0 && old_val != new_val) {
+ if (old_val != new_val) {
gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
} else {
gpr_asprintf(&str, "%" PRIu32 "", old_val);
@@ -98,10 +99,12 @@ void FlowControlTrace::Finish() {
if (sfc_ != nullptr) {
srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
sfc_->remote_window_delta() + remote_window);
- slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window,
- local_window_delta_ + acked_local_window);
- saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
- announced_window_delta_ + acked_local_window);
+ slw_str =
+ fmt_int64_diff_str(local_window_delta_ + acked_local_window,
+ sfc_->local_window_delta() + acked_local_window);
+ saw_str =
+ fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
+ sfc_->announced_window_delta() + acked_local_window);
} else {
srw_str = gpr_leftpad("", ' ', kTracePadding);
slw_str = gpr_leftpad("", ' ', kTracePadding);
@@ -191,7 +194,7 @@ uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
announced_window_ != target_announced_window) {
const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
- target_announced_window - announced_window_, 0, UINT32_MAX);
+ target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
announced_window_ += announce;
return announce;
}
@@ -265,7 +268,7 @@ uint32_t StreamFlowControl::MaybeSendUpdate() {
FlowControlTrace trace("s updt sent", tfc_, this);
if (local_window_delta_ > announced_window_delta_) {
uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
- local_window_delta_ - announced_window_delta_, 0, UINT32_MAX);
+ local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
UpdateAnnouncedWindowDelta(tfc_, announce);
return announce;
}