diff options
author | Craig Tiller <ctiller@google.com> | 2016-07-07 09:43:24 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-07-07 09:43:24 -0700 |
commit | c0e73da8c22550070f328a6fd168abced87342d7 (patch) | |
tree | 70a6eea807032dbecf4226e19cd66a222a5d2849 /src/core/ext | |
parent | 79a904a5362091816c3cf9797ac242c984d52e57 (diff) |
Fix flow control issue, make debugging in the future easier
Diffstat (limited to 'src/core/ext')
5 files changed, 36 insertions, 42 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8b7f91d3af..aeedf417b4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -295,7 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_add( &t->global.qbuf, gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, &t->global, false); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -799,14 +799,14 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - bool covered_by_poller) { + bool covered_by_poller, const char *reason) { grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); switch (t->executor.write_state) { case GRPC_CHTTP2_WRITING_INACTIVE: set_write_state(t, covered_by_poller ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, - "initiate_write"); + reason); break; case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: /* nothing to do: write already requested */ @@ -815,7 +815,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, if (covered_by_poller) { /* upgrade to note poller is available to cover the write */ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, - "initiate_write"); + reason); } break; case GRPC_CHTTP2_WRITE_SCHEDULED: @@ -825,7 +825,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, set_write_state(t, covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER : GRPC_CHTTP2_WRITING_STALE_NO_POLLER, - "initiate_write"); + reason); break; case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: /* nothing to do: write already requested */ @@ -834,7 +834,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, if (covered_by_poller) { /* upgrade to note poller is available to cover the write */ set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, - "initiate_write"); + reason); } break; } @@ -881,11 +881,11 @@ static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - bool covered_by_poller) { + bool covered_by_poller, const char *reason) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller); + grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, reason); } } @@ -901,7 +901,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; - grpc_chttp2_initiate_write(exec_ctx, &t->global, false); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); } } @@ -1040,7 +1040,7 @@ static void maybe_start_some_streams( stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - true); + true, "new_stream"); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1176,7 +1176,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { GPR_ASSERT(stream_global->id != 0); grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - true); + true, "op.send_initial_metadata"); } } else { stream_global->send_trailing_metadata = NULL; @@ -1202,7 +1202,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->send_message = op->send_message; if (stream_global->id != 0) { grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - true); + true, "op.send_message"); } } } @@ -1247,7 +1247,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - true); + true, "op.send_trailing_metadata"); } } } @@ -1313,7 +1313,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); - grpc_chttp2_initiate_write(exec_ctx, &t->global, true); + grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1373,7 +1373,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, close_transport = grpc_chttp2_has_streams(t) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("GOAWAY sent"); - grpc_chttp2_initiate_write(exec_ctx, &t->global, false); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); } if (op->set_accept_stream) { @@ -1578,7 +1578,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, &stream_global->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "rst_stream"); } const char *msg = @@ -1844,7 +1844,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "close_from_api"); } typedef struct { @@ -1896,7 +1896,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) { if (was_zero && !is_zero) { grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, - true); + true, "update_global_window"); } } @@ -2007,7 +2007,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* copy parsing qbuf to global qbuf */ if (t->parsing.qbuf.count > 0) { gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "parsing_qbuf"); } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); @@ -2176,6 +2176,7 @@ static void incoming_byte_stream_update_flow_control( if (stream_global->max_recv_bytes < max_recv_bytes) { uint32_t add_max_recv_bytes = max_recv_bytes - stream_global->max_recv_bytes; + gpr_log(GPR_DEBUG, "add_max_recv_bytes:%d", add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, max_recv_bytes, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, @@ -2187,7 +2188,7 @@ static void incoming_byte_stream_update_flow_control( grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - false); + false, "read_incoming_stream"); } } @@ -2395,7 +2396,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, if (context == NULL) { *scope = NULL; gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2408,7 +2409,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_free(tmp); } gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2441,7 +2442,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, tmp_phase = gpr_leftpad(phase, ' ', 8); tmp_scope1 = gpr_leftpad(scope1, ' ', 11); - gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); gpr_free(tmp_phase); gpr_free(tmp_scope1); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 776017bee2..2a12afad6c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -542,7 +542,7 @@ struct grpc_chttp2_stream { should be performed. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, - bool covered_by_poller); + bool covered_by_poller, const char *reason); /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ @@ -631,8 +631,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, @@ -845,6 +844,6 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - bool covered_by_poller); + bool covered_by_poller, const char *reason); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index b42d98b3b0..efc27775f0 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -157,7 +157,7 @@ void grpc_chttp2_publish_reads( while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, &stream_global)) { grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - false); + false, "transport.read_flow_control"); } } @@ -169,7 +169,7 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, transport_global, false); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "global incoming window"); } /* for each stream that saw an update, fixup global state */ @@ -193,7 +193,7 @@ void grpc_chttp2_publish_reads( is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, - false); + false, "stream.read_flow_control"); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index f227a0af98..aaa4768c7b 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -337,19 +337,13 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( } void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { - if (is_window_available) { - grpc_chttp2_become_writable(exec_ctx, &transport->global, &stream->global, - true); - } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - &stream->writing); - } + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b19f5f068d..cbc57d10ad 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,9 +75,6 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); - bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing, is_window_available); /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -331,6 +328,9 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing); + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { |