aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-14 19:31:47 -0700
committerGravatar GitHub <noreply@github.com>2016-07-14 19:31:47 -0700
commit13d3e3b7e2538025148dd08956765eab516b2f0b (patch)
tree8683f169471c2220d9ac4cb34aae5609b09eff5a /src/core/ext
parent421643727f2e7a57fc89003bca6797f18563bfc5 (diff)
parent40b495ac7bfe7fcde7d7d4d993714dfb6261c870 (diff)
Merge pull request #6737 from ctiller/delayed-write
Delay beginning most writes until we enter poll()
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_plugin.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c369
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h65
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c20
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c16
7 files changed, 388 insertions, 102 deletions
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 721ba82d8f..9acacbd92d 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
connector *c = arg;
grpc_closure *notify;
gpr_mu_lock(&c->mu);
+ grpc_error *error = GRPC_ERROR_NONE;
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
gpr_mu_unlock(&c->mu);
} else if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status);
+ error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"),
+ GRPC_ERROR_INT_SECURITY_STATUS, status);
memset(c->result, 0, sizeof(*c->result));
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
@@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index bd87253ed3..7d5279b9da 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -36,11 +36,14 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/metadata.h"
+extern int grpc_http_write_state_trace;
+
void grpc_chttp2_plugin_init(void) {
grpc_chttp2_base64_encode_and_huffman_compress =
grpc_chttp2_base64_encode_and_huffman_compress_impl;
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+ grpc_register_tracer("http_write_state", &grpc_http_write_state_trace);
}
void grpc_chttp2_plugin_shutdown(void) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 5aae753c07..be8a8f8498 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -60,9 +61,9 @@
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
-
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
+int grpc_http_write_state_trace = 0;
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
@@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error);
/** Set a transport level setting, and push it to our peer */
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value);
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
@@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
+/*#define REFCOUNTING_DEBUG 1*/
#ifdef REFCOUNTING_DEBUG
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
@@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
- grpc_endpoint *ep, uint8_t is_client) {
+ grpc_endpoint *ep, bool is_client) {
size_t i;
int j;
@@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_add(
&t->global.qbuf,
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* configure http2 the way we like it */
if (is_client) {
- push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ DEFAULT_WINDOW);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
if (channel_args) {
@@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_MAX_METADATA_SIZE);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
}
@@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p close transport", t);
+ }
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
@@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_trailing_metadata);
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
- GRPC_ERROR_UNREF(s->global.removal_error);
+ GRPC_ERROR_UNREF(s->global.read_closed_error);
+ GRPC_ERROR_UNREF(s->global.write_closed_error);
UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
* LOCK MANAGEMENT
*/
+static const char *write_state_name(grpc_chttp2_write_state state) {
+ switch (state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ return "INACTIVE";
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ return "REQUESTED[p=0]";
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ return "REQUESTED[p=1]";
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ return "SCHEDULED";
+ case GRPC_CHTTP2_WRITING:
+ return "WRITING";
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ return "WRITING[p=1]";
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ return "WRITING[p=0]";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_write_state(grpc_chttp2_transport *t,
+ grpc_chttp2_write_state state, const char *reason) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t,
+ write_state_name(t->executor.write_state), write_state_name(state),
+ reason);
+ }
+ t->executor.write_state = state;
+}
+
static void finish_global_actions(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_executor_action_header *hdr;
@@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("finish_global_actions", 0);
for (;;) {
- if (!t->executor.writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
- t->executor.writing_active = 1;
- REF_TRANSPORT(t, "writing");
- prevent_endpoint_shutdown(t);
- grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
- }
check_read_ops(exec_ctx, &t->global);
gpr_mu_lock(&t->executor.mu);
@@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
continue;
} else {
t->executor.global_active = false;
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
+ REF_TRANSPORT(t, "initiate_writing");
+ gpr_mu_unlock(&t->executor.mu);
+ grpc_exec_ctx_sched(
+ exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
+ t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ start_writing(exec_ctx, t);
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITING:
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
}
- gpr_mu_unlock(&t->executor.mu);
break;
}
@@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
* OUTPUT PROCESSING
*/
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason) {
+ /* Perform state checks, and transition to a scheduled state if appropriate.
+ Each time we finish the global lock execution, we check if we need to
+ write. If we do:
+ - (if there is a poller surrounding the write) schedule
+ initiate_writing, which locks and calls initiate_writing_locked to...
+ - call start_writing, which verifies (under the global lock) that there
+ are things that need to be written by calling
+ grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
+ against the current exec_ctx, to be executed OUTSIDE of the global lock
+ - eventually writing_action results in grpc_chttp2_terminate_writing being
+ called, which re-takes the global lock, updates state, checks if we need
+ to do *another* write immediately, and if so loops back to
+ start_writing.
+
+ Current problems:
+ - too much lock entry/exiting
+ - the writing thread can become stuck indefinitely (punt through the
+ workqueue periodically to fix) */
+
+ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ set_write_state(t, covered_by_poller
+ ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
+ : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ }
+ break;
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ /* nothing to do: write already scheduled */
+ break;
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t,
+ covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
+ : GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason);
+ }
+ break;
+ }
+}
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
+ t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ if (!t->closed &&
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
+ set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
+ REF_TRANSPORT(t, "writing");
+ prevent_endpoint_shutdown(t);
+ grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
+ } else {
+ if (t->closed) {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:transport_closed");
+ } else {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:nothing_to_write");
+ }
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
+ }
+}
+
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg_ignored) {
+ start_writing(exec_ctx, t);
+ UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
+}
+
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
+ NULL, 0);
+}
+
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller,
+ reason);
}
}
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value) {
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
@@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->global.dirtied_local_settings = 1;
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting");
}
}
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error) {
+ grpc_chttp2_stream_global *stream_global;
+ while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
+ &stream_global)) {
+ fail_pending_writes(exec_ctx, &t->global, stream_global,
+ GRPC_ERROR_REF(error));
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_ignored,
@@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- grpc_chttp2_stream_global *stream_global;
- while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
- &stream_global)) {
- fail_pending_writes(exec_ctx, &t->global, stream_global,
- GRPC_ERROR_REF(error));
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ end_waiting_for_write(exec_ctx, t, error);
+
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ GPR_UNREACHABLE_CODE(break);
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ "terminate_writing");
+ break;
}
- /* leave the writing flag up on shutdown to prevent further writes in
- unlock()
- from starting */
- t->executor.writing_active = 0;
if (t->ep && !t->endpoint_reading) {
destroy_endpoint(exec_ctx, t);
}
UNREF_TRANSPORT(exec_ctx, t, "writing");
- GRPC_ERROR_UNREF(error);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
@@ -878,7 +1059,8 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_initial_metadata");
}
} else {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished,
@@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
stream_global->send_message = op->send_message;
if (stream_global->id != 0) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_message");
}
}
}
@@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_global->write_closed) {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished,
@@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_trailing_metadata");
}
}
}
@@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
(stream_global->incoming_frames.head == NULL ||
stream_global->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(
- transport_global, stream_global, transport_global->stream_lookahead,
- 0);
+ exec_ctx, transport_global, stream_global,
+ transport_global->stream_lookahead, 0);
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof(*op));
}
-static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
p->next = &t->global.pings;
p->prev = p->next->prev;
@@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff);
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
close_transport = grpc_chttp2_has_streams(t)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("GOAWAY sent");
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent");
}
if (op->set_accept_stream) {
@@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(t, op->send_ping);
+ send_ping_locked(exec_ctx, t, op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
&stream_global->stats.outgoing));
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "rst_stream");
}
const char *msg =
@@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
+static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
+ if (error == GRPC_ERROR_NONE) return;
+ for (size_t i = 0; i < *nrefs; i++) {
+ if (error == refs[i]) {
+ return;
+ }
+ }
+ refs[*nrefs] = error;
+ ++*nrefs;
+}
+
+static grpc_error *removal_error(grpc_error *extra_error,
+ grpc_chttp2_stream_global *stream_global) {
+ grpc_error *refs[3];
+ size_t nrefs = 0;
+ add_error(stream_global->read_closed_error, refs, &nrefs);
+ add_error(stream_global->write_closed_error, refs, &nrefs);
+ add_error(extra_error, refs, &nrefs);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (nrefs > 0) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs,
+ nrefs);
+ }
+ GRPC_ERROR_UNREF(extra_error);
+ return error;
+}
+
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error) {
+ error = removal_error(error, stream_global);
+ stream_global->send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error));
@@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed(
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
+ stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
stream_global->published_initial_metadata = true;
stream_global->published_trailing_metadata = true;
decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
+ stream_global->write_closed_error = GRPC_ERROR_REF(error);
stream_global->write_closed = true;
- if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
+ if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state !=
+ GRPC_CHTTP2_WRITING_INACTIVE) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
@@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed(
}
}
if (stream_global->read_closed && stream_global->write_closed) {
- stream_global->removal_error = GRPC_ERROR_REF(error);
if (stream_global->id != 0 &&
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
@@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed(
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
- stream_global->id, GRPC_ERROR_REF(error));
+ stream_global->id,
+ removal_error(GRPC_ERROR_REF(error), stream_global));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, error);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "close_from_api");
}
typedef struct {
@@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
/** update window from a settings change */
+typedef struct {
+ grpc_chttp2_transport *t;
+ grpc_exec_ctx *exec_ctx;
+} update_global_window_args;
+
static void update_global_window(void *args, uint32_t id, void *stream) {
- grpc_chttp2_transport *t = args;
+ update_global_window_args *a = args;
+ grpc_chttp2_transport *t = a->t;
grpc_chttp2_stream *s = stream;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
@@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global,
+ true, "update_global_window");
}
}
@@ -1802,14 +2035,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
- gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ if (t->parsing.qbuf.count > 0) {
+ gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "parsing_qbuf");
+ }
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
transport_global->concurrent_stream_count =
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (transport_parsing->initial_window_update != 0) {
+ update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
+ update_global_window, &args);
transport_parsing->initial_window_update = 0;
}
/* handle higher level things */
@@ -1832,7 +2070,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
- GRPC_ERROR_REF(stream_global->removal_error));
+ removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1855,11 +2093,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
}
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- if (!t->executor.writing_active && t->ep) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe as we still have a ref for read */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
+ write_state_name(t->executor.write_state));
+ }
+ if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
+ destroy_endpoint(exec_ctx, t);
}
} else if (!t->closed) {
keep_reading = true;
@@ -1943,7 +2182,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
@@ -1978,7 +2217,8 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "read_incoming_stream");
}
}
@@ -2000,8 +2240,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(
- transport_global, stream_global, arg->max_size_hint, bs->slices.length);
+ incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
+ stream_global, arg->max_size_hint,
+ bs->slices.length);
}
if (bs->slices.count > 0) {
*arg->slice = gpr_slice_buffer_take_first(&bs->slices);
@@ -2185,7 +2426,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
if (context == NULL) {
*scope = NULL;
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2198,7 +2439,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_free(tmp);
}
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2231,7 +2472,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
tmp_phase = gpr_leftpad(phase, ' ', 8);
tmp_scope1 = gpr_leftpad(scope1, ' ', 11);
- gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1);
+ gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1);
gpr_free(tmp_phase);
gpr_free(tmp_scope1);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8d79e93ceb..e1dcf5262a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header {
void *arg;
} grpc_chttp2_executor_action_header;
+typedef enum {
+ /** no writing activity */
+ GRPC_CHTTP2_WRITING_INACTIVE,
+ /** write has been requested, but not scheduled yet */
+ GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ /** write has been requested and scheduled against the workqueue */
+ GRPC_CHTTP2_WRITE_SCHEDULED,
+ /** write has been initiated after being reaped from the workqueue */
+ GRPC_CHTTP2_WRITING,
+ /** write has been initiated, AND another write needs to be started once it's
+ done */
+ GRPC_CHTTP2_WRITING_STALE_WITH_POLLER,
+ GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+} grpc_chttp2_write_state;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -319,10 +335,10 @@ struct grpc_chttp2_transport {
/** is a thread currently in the global lock */
bool global_active;
- /** is a thread currently writing */
- bool writing_active;
/** is a thread currently parsing */
bool parsing_active;
+ /** write execution state of the transport */
+ grpc_chttp2_write_state write_state;
grpc_chttp2_executor_action_header *pending_actions_head;
grpc_chttp2_executor_action_header *pending_actions_tail;
@@ -342,7 +358,8 @@ struct grpc_chttp2_transport {
/** global state for reading/writing */
grpc_chttp2_transport_global global;
/** state only accessible by the chain of execution that
- set writing_active=1 */
+ set writing_state >= GRPC_WRITING, and only by the writing closure
+ chain. */
grpc_chttp2_transport_writing writing;
/** state only accessible by the chain of execution that
set parsing_active=1 */
@@ -363,6 +380,8 @@ struct grpc_chttp2_transport {
grpc_closure reading_action;
/** closure to actually do parsing */
grpc_closure parsing_action;
+ /** closure to initiate writing */
+ grpc_closure initiate_writing;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -436,8 +455,10 @@ typedef struct {
bool seen_error;
bool exceeded_metadata_size;
- /** the error that resulted in this stream being removed */
- grpc_error *removal_error;
+ /** the error that resulted in this stream being read-closed */
+ grpc_error *read_closed_error;
+ /** the error that resulted in this stream being write-closed */
+ grpc_error *write_closed_error;
bool published_initial_metadata;
bool published_trailing_metadata;
@@ -514,15 +535,20 @@ struct grpc_chttp2_stream {
};
/** Transport writing call flow:
- chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes
- are required;
- if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the
- writes.
- Once writes have been completed (meaning another write could potentially be
- started),
- grpc_chttp2_terminate_writing is called. This will call
- grpc_chttp2_cleanup_writing, at which
- point the write phase is complete. */
+ grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
+ go out on the wire.
+ If no other write has been started, a task is enqueued onto our workqueue.
+ When that task executes, it obtains the global lock, and gathers the data
+ to write.
+ The global lock is dropped and we do the syscall to write.
+ After writing, a follow-up check is made to see if another round of writing
+ should be performed.
+
+ The actual call chain is documented in the implementation of this function.
+ */
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason);
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
@@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available);
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
@@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 84eb5752f1..e1fc0ddee2 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads(
transport_parsing, outgoing_window);
is_zero = transport_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
- &stream_global)) {
- grpc_chttp2_become_writable(transport_global, stream_global);
- }
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "new_global_flow_control");
}
if (transport_parsing->incoming_window <
@@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads(
announce_incoming_window, announce_bytes);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
incoming_window, announce_bytes);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "global incoming window");
}
/* for each stream that saw an update, fixup global state */
@@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "stream.read_flow_control");
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 8f3ab00e6d..2eb5f5f632 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
+ gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id);
if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
}
@@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available) {
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
+ bool out = false;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
- if (is_window_available) {
- grpc_chttp2_become_writable(&transport->global, &stream->global);
- } else {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- &stream->writing);
- }
+ gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled",
+ stream->global.id);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ &stream->writing);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
+ out = true;
}
+ return out;
}
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
+ gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index b19f5f068d..e0d87725e9 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
- bool is_window_available = transport_writing->outgoing_window > 0;
- grpc_chttp2_list_flush_writing_stalled_by_transport(
- exec_ctx, transport_writing, is_window_available);
+ if (transport_writing->outgoing_window > 0) {
+ while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
+ &stream_global)) {
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "transport.read_flow_control");
+ }
+ }
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
+ if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
+ transport_writing)) {
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "resume_stalled_stream");
+ }
+
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {