diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 41 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.c | 6 | ||||
-rw-r--r-- | src/core/lib/support/log_posix.c | 12 |
4 files changed, 48 insertions, 17 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6bca422f17..ce12f2dc26 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -233,7 +233,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; - t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->ping_counter = 1; t->pings.next = t->pings.prev = &t->pings; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; @@ -511,6 +510,23 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } } + if (s->incoming_window_delta > 0) { + t->retract_incoming_window += s->incoming_window_delta; + } else if (s->incoming_window_delta < 0) { + int64_t give_back = -s->incoming_window_delta; + if (give_back > t->retract_incoming_window) { + give_back -= t->retract_incoming_window; + t->retract_incoming_window = 0; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("destroy", t, announce_incoming_window, + give_back); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("destroy", t, incoming_window, + give_back); + grpc_chttp2_initiate_write(exec_ctx, t, false, "destroy_stream"); + } else { + t->retract_incoming_window -= give_back; + } + } + GPR_ASSERT(s->send_initial_metadata_finished == NULL); GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); @@ -1786,16 +1802,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } t->initial_window_update = 0; } - /* handle higher level things */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - int64_t announce_bytes = t->connection_window_target - t->incoming_window; - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window, - announce_bytes); - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window, - announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window"); - } - GPR_TIMER_END("post_parse_locked", 0); } @@ -1908,6 +1914,19 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, grpc_chttp2_become_writable(exec_ctx, t, s, new_window_write_is_covered_by_poller, "read_incoming_stream"); + if (t->retract_incoming_window >= add_max_recv_bytes) { + t->retract_incoming_window -= add_max_recv_bytes; + } else { + add_max_recv_bytes -= t->retract_incoming_window; + t->retract_incoming_window = 0; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("op", t, announce_incoming_window, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("op", t, incoming_window, + add_max_recv_bytes); + grpc_chttp2_initiate_write(exec_ctx, t, + new_window_write_is_covered_by_poller, + "read_incoming_stream"); + } } } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index efc11357e9..c16c1f61d9 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -250,8 +250,10 @@ struct grpc_chttp2_transport { /** window available to announce to peer */ int64_t announce_incoming_window; - /** how much window would we like to have for incoming_window */ - uint32_t connection_window_target; + /** how many bytes have been given out as transport window that we'd now like + to retract? (since we can't retract incoming window, instead we just dont + give out any more until this amount goes to zero) */ + int64_t retract_incoming_window; /** have we seen a goaway */ uint8_t seen_goaway; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 086ed18560..21e7d7927d 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -398,6 +398,12 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, incoming_frame_size); s->received_bytes += incoming_frame_size; + } else { + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, announce_incoming_window, + incoming_frame_size); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", t, incoming_window, + incoming_frame_size); + grpc_chttp2_initiate_write(exec_ctx, t, false, "destroy_stream"); } return GRPC_ERROR_NONE; diff --git a/src/core/lib/support/log_posix.c b/src/core/lib/support/log_posix.c index f972da0887..79458dd7a3 100644 --- a/src/core/lib/support/log_posix.c +++ b/src/core/lib/support/log_posix.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <pthread.h> #include <stdarg.h> @@ -93,10 +94,13 @@ void gpr_default_log(gpr_log_func_args *args) { strcpy(time_buffer, "error:strftime"); } - fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n", - gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), gettid(), display_file, args->line, - args->message); + char *prefix; + gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]", + gpr_log_severity_string(args->severity), time_buffer, + (int)(now.tv_nsec), gettid(), display_file, args->line); + + fprintf(stderr, "%-70s %s\n", prefix, args->message); + gpr_free(prefix); } #endif /* defined(GPR_POSIX_LOG) */ |