aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile36
-rw-r--r--build.yaml14
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_plugin.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c369
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h65
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c20
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c16
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c277
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c3
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/exec_ctx.c10
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/iomgr/iomgr.c3
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c7
-rw-r--r--src/core/lib/iomgr/network_status_tracker.h4
-rw-r--r--src/core/lib/iomgr/tcp_posix.c18
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c3
-rw-r--r--src/core/lib/iomgr/tcp_windows.c13
-rw-r--r--src/core/lib/iomgr/workqueue.h39
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c8
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h5
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c22
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c18
-rw-r--r--src/core/lib/surface/server.c76
-rw-r--r--src/core/lib/transport/connectivity_state.c3
-rw-r--r--test/core/end2end/tests/high_initial_seqno.c6
-rw-r--r--test/core/end2end/tests/network_status_change.c5
-rw-r--r--test/core/internal_api_canaries/iomgr.c13
-rw-r--r--test/core/iomgr/workqueue_test.c150
-rw-r--r--test/core/util/mock_endpoint.c12
-rw-r--r--test/core/util/passthru_endpoint.c12
-rw-r--r--test/cpp/end2end/end2end_test.cc3
-rw-r--r--test/cpp/qps/client_async.cc35
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py11
-rw-r--r--test/cpp/qps/json_run_localhost.cc2
-rw-r--r--test/cpp/qps/server_async.cc5
-rw-r--r--tools/dockerfile/test/python_pyenv_x64/Dockerfile2
-rwxr-xr-xtools/run_tests/run_tests.py28
-rw-r--r--tools/run_tests/sources_and_headers.json16
-rw-r--r--tools/run_tests/tests.json131
46 files changed, 883 insertions, 622 deletions
diff --git a/Makefile b/Makefile
index e958b145e7..4ce22678d7 100644
--- a/Makefile
+++ b/Makefile
@@ -991,7 +991,6 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test
uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test
-workqueue_test: $(BINDIR)/$(CONFIG)/workqueue_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
@@ -1295,7 +1294,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \
$(BINDIR)/$(CONFIG)/uri_parser_test \
- $(BINDIR)/$(CONFIG)/workqueue_test \
$(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \
$(BINDIR)/$(CONFIG)/badreq_bad_client_test \
$(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \
@@ -1674,8 +1672,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 )
$(E) "[RUN] Testing uri_parser_test"
$(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 )
- $(E) "[RUN] Testing workqueue_test"
- $(Q) $(BINDIR)/$(CONFIG)/workqueue_test || ( echo test workqueue_test failed ; exit 1 )
$(E) "[RUN] Testing public_headers_must_be_c89"
$(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 )
$(E) "[RUN] Testing badreq_bad_client_test"
@@ -10175,38 +10171,6 @@ endif
endif
-WORKQUEUE_TEST_SRC = \
- test/core/iomgr/workqueue_test.c \
-
-WORKQUEUE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WORKQUEUE_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/workqueue_test: openssl_dep_error
-
-else
-
-
-
-$(BINDIR)/$(CONFIG)/workqueue_test: $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LD) $(LDFLAGS) $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/workqueue_test
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/core/iomgr/workqueue_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_workqueue_test: $(WORKQUEUE_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(WORKQUEUE_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
ALARM_CPP_TEST_SRC = \
test/cpp/common/alarm_cpp_test.cc \
diff --git a/build.yaml b/build.yaml
index 071d4a8e1e..57545839d4 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2430,20 +2430,6 @@ targets:
- grpc
- gpr_test_util
- gpr
-- name: workqueue_test
- build: test
- language: c
- src:
- - test/core/iomgr/workqueue_test.c
- deps:
- - grpc_test_util
- - grpc
- - gpr_test_util
- - gpr
- platforms:
- - mac
- - linux
- - posix
- name: alarm_cpp_test
gtest: true
build: test
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 721ba82d8f..9acacbd92d 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
connector *c = arg;
grpc_closure *notify;
gpr_mu_lock(&c->mu);
+ grpc_error *error = GRPC_ERROR_NONE;
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
gpr_mu_unlock(&c->mu);
} else if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status);
+ error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"),
+ GRPC_ERROR_INT_SECURITY_STATUS, status);
memset(c->result, 0, sizeof(*c->result));
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
@@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index bd87253ed3..7d5279b9da 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -36,11 +36,14 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/metadata.h"
+extern int grpc_http_write_state_trace;
+
void grpc_chttp2_plugin_init(void) {
grpc_chttp2_base64_encode_and_huffman_compress =
grpc_chttp2_base64_encode_and_huffman_compress_impl;
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
+ grpc_register_tracer("http_write_state", &grpc_http_write_state_trace);
}
void grpc_chttp2_plugin_shutdown(void) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 5aae753c07..be8a8f8498 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -60,9 +61,9 @@
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
-
int grpc_http_trace = 0;
int grpc_flowctl_trace = 0;
+int grpc_http_write_state_trace = 0;
#define TRANSPORT_FROM_WRITING(tw) \
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
@@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error);
/** Set a transport level setting, and push it to our peer */
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value);
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
@@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
+/*#define REFCOUNTING_DEBUG 1*/
#ifdef REFCOUNTING_DEBUG
#define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
#define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
@@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
- grpc_endpoint *ep, uint8_t is_client) {
+ grpc_endpoint *ep, bool is_client) {
size_t i;
int j;
@@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_add(
&t->global.qbuf,
gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
@@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* configure http2 the way we like it */
if (is_client) {
- push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
- push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ DEFAULT_WINDOW);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
if (channel_args) {
@@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
} else if (0 == strcmp(channel_args->args[i].key,
@@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_MAX_METADATA_SIZE);
} else {
- push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
(uint32_t)channel_args->args[i].value.integer);
}
}
@@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p close transport", t);
+ }
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
@@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_trailing_metadata);
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
- GRPC_ERROR_UNREF(s->global.removal_error);
+ GRPC_ERROR_UNREF(s->global.read_closed_error);
+ GRPC_ERROR_UNREF(s->global.write_closed_error);
UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
* LOCK MANAGEMENT
*/
+static const char *write_state_name(grpc_chttp2_write_state state) {
+ switch (state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ return "INACTIVE";
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ return "REQUESTED[p=0]";
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ return "REQUESTED[p=1]";
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ return "SCHEDULED";
+ case GRPC_CHTTP2_WRITING:
+ return "WRITING";
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ return "WRITING[p=1]";
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ return "WRITING[p=0]";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_write_state(grpc_chttp2_transport *t,
+ grpc_chttp2_write_state state, const char *reason) {
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t,
+ write_state_name(t->executor.write_state), write_state_name(state),
+ reason);
+ }
+ t->executor.write_state = state;
+}
+
static void finish_global_actions(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_executor_action_header *hdr;
@@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("finish_global_actions", 0);
for (;;) {
- if (!t->executor.writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
- t->executor.writing_active = 1;
- REF_TRANSPORT(t, "writing");
- prevent_endpoint_shutdown(t);
- grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
- }
check_read_ops(exec_ctx, &t->global);
gpr_mu_lock(&t->executor.mu);
@@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
continue;
} else {
t->executor.global_active = false;
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
+ REF_TRANSPORT(t, "initiate_writing");
+ gpr_mu_unlock(&t->executor.mu);
+ grpc_exec_ctx_sched(
+ exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
+ t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ start_writing(exec_ctx, t);
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITING:
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ gpr_mu_unlock(&t->executor.mu);
+ break;
+ }
}
- gpr_mu_unlock(&t->executor.mu);
break;
}
@@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
* OUTPUT PROCESSING
*/
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason) {
+ /* Perform state checks, and transition to a scheduled state if appropriate.
+ Each time we finish the global lock execution, we check if we need to
+ write. If we do:
+ - (if there is a poller surrounding the write) schedule
+ initiate_writing, which locks and calls initiate_writing_locked to...
+ - call start_writing, which verifies (under the global lock) that there
+ are things that need to be written by calling
+ grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
+ against the current exec_ctx, to be executed OUTSIDE of the global lock
+ - eventually writing_action results in grpc_chttp2_terminate_writing being
+ called, which re-takes the global lock, updates state, checks if we need
+ to do *another* write immediately, and if so loops back to
+ start_writing.
+
+ Current problems:
+ - too much lock entry/exiting
+ - the writing thread can become stuck indefinitely (punt through the
+ workqueue periodically to fix) */
+
+ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ set_write_state(t, covered_by_poller
+ ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
+ : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ }
+ break;
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ /* nothing to do: write already scheduled */
+ break;
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t,
+ covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
+ : GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+ reason);
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ /* nothing to do: write already requested */
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ if (covered_by_poller) {
+ /* upgrade to note poller is available to cover the write */
+ set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason);
+ }
+ break;
+ }
+}
+
+static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
+ t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ if (!t->closed &&
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
+ set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
+ REF_TRANSPORT(t, "writing");
+ prevent_endpoint_shutdown(t);
+ grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
+ } else {
+ if (t->closed) {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:transport_closed");
+ } else {
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
+ "start_writing:nothing_to_write");
+ }
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
+ if (t->ep && !t->endpoint_reading) {
+ destroy_endpoint(exec_ctx, t);
+ }
+ }
+}
+
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s_unused,
+ void *arg_ignored) {
+ start_writing(exec_ctx, t);
+ UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
+}
+
+static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
+ NULL, 0);
+}
+
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller,
+ reason);
}
}
-static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
- uint32_t value) {
+static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
@@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->global.dirtied_local_settings = 1;
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting");
}
}
+static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error) {
+ grpc_chttp2_stream_global *stream_global;
+ while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
+ &stream_global)) {
+ fail_pending_writes(exec_ctx, &t->global, stream_global,
+ GRPC_ERROR_REF(error));
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_ignored,
@@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- grpc_chttp2_stream_global *stream_global;
- while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
- &stream_global)) {
- fail_pending_writes(exec_ctx, &t->global, stream_global,
- GRPC_ERROR_REF(error));
- GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
+ end_waiting_for_write(exec_ctx, t, error);
+
+ switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
+ case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
+ GPR_UNREACHABLE_CODE(break);
+ case GRPC_CHTTP2_WRITING:
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ "terminate_writing");
+ break;
+ case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
+ set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ "terminate_writing");
+ break;
}
- /* leave the writing flag up on shutdown to prevent further writes in
- unlock()
- from starting */
- t->executor.writing_active = 0;
if (t->ep && !t->endpoint_reading) {
destroy_endpoint(exec_ctx, t);
}
UNREF_TRANSPORT(exec_ctx, t, "writing");
- GRPC_ERROR_UNREF(error);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
@@ -878,7 +1059,8 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_initial_metadata");
}
} else {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished,
@@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
stream_global->send_message = op->send_message;
if (stream_global->id != 0) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_message");
}
}
}
@@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (stream_global->write_closed) {
+ stream_global->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished,
@@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ true, "op.send_trailing_metadata");
}
}
}
@@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
(stream_global->incoming_frames.head == NULL ||
stream_global->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(
- transport_global, stream_global, transport_global->stream_lookahead,
- 0);
+ exec_ctx, transport_global, stream_global,
+ transport_global->stream_lookahead, 0);
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof(*op));
}
-static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
p->next = &t->global.pings;
p->prev = p->next->prev;
@@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
p->id[7] = (uint8_t)(t->global.ping_counter & 0xff);
p->on_recv = on_recv;
gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
close_transport = grpc_chttp2_has_streams(t)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("GOAWAY sent");
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent");
}
if (op->set_accept_stream) {
@@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(t, op->send_ping);
+ send_ping_locked(exec_ctx, t, op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
&stream_global->stats.outgoing));
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "rst_stream");
}
const char *msg =
@@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
+static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
+ if (error == GRPC_ERROR_NONE) return;
+ for (size_t i = 0; i < *nrefs; i++) {
+ if (error == refs[i]) {
+ return;
+ }
+ }
+ refs[*nrefs] = error;
+ ++*nrefs;
+}
+
+static grpc_error *removal_error(grpc_error *extra_error,
+ grpc_chttp2_stream_global *stream_global) {
+ grpc_error *refs[3];
+ size_t nrefs = 0;
+ add_error(stream_global->read_closed_error, refs, &nrefs);
+ add_error(stream_global->write_closed_error, refs, &nrefs);
+ add_error(extra_error, refs, &nrefs);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (nrefs > 0) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs,
+ nrefs);
+ }
+ GRPC_ERROR_UNREF(extra_error);
+ return error;
+}
+
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error) {
+ error = removal_error(error, stream_global);
+ stream_global->send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error));
@@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed(
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
+ stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
stream_global->published_initial_metadata = true;
stream_global->published_trailing_metadata = true;
decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
+ stream_global->write_closed_error = GRPC_ERROR_REF(error);
stream_global->write_closed = true;
- if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
+ if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state !=
+ GRPC_CHTTP2_WRITING_INACTIVE) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
@@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed(
}
}
if (stream_global->read_closed && stream_global->write_closed) {
- stream_global->removal_error = GRPC_ERROR_REF(error);
if (stream_global->id != 0 &&
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
@@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed(
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
- stream_global->id, GRPC_ERROR_REF(error));
+ stream_global->id,
+ removal_error(GRPC_ERROR_REF(error), stream_global));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, error);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "close_from_api");
}
typedef struct {
@@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
/** update window from a settings change */
+typedef struct {
+ grpc_chttp2_transport *t;
+ grpc_exec_ctx *exec_ctx;
+} update_global_window_args;
+
static void update_global_window(void *args, uint32_t id, void *stream) {
- grpc_chttp2_transport *t = args;
+ update_global_window_args *a = args;
+ grpc_chttp2_transport *t = a->t;
grpc_chttp2_stream *s = stream;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
@@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global,
+ true, "update_global_window");
}
}
@@ -1802,14 +2035,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
- gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ if (t->parsing.qbuf.count > 0) {
+ gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "parsing_qbuf");
+ }
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
transport_global->concurrent_stream_count =
(uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (transport_parsing->initial_window_update != 0) {
+ update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
+ update_global_window, &args);
transport_parsing->initial_window_update = 0;
}
/* handle higher level things */
@@ -1832,7 +2070,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
- GRPC_ERROR_REF(stream_global->removal_error));
+ removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1855,11 +2093,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
}
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- if (!t->executor.writing_active && t->ep) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe as we still have a ref for read */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+ if (grpc_http_write_state_trace) {
+ gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
+ write_state_name(t->executor.write_state));
+ }
+ if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
+ destroy_endpoint(exec_ctx, t);
}
} else if (!t->closed) {
keep_reading = true;
@@ -1943,7 +2182,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
static void incoming_byte_stream_update_flow_control(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
@@ -1978,7 +2217,8 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "read_incoming_stream");
}
}
@@ -2000,8 +2240,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(
- transport_global, stream_global, arg->max_size_hint, bs->slices.length);
+ incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
+ stream_global, arg->max_size_hint,
+ bs->slices.length);
}
if (bs->slices.count > 0) {
*arg->slice = gpr_slice_buffer_take_first(&bs->slices);
@@ -2185,7 +2426,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
if (context == NULL) {
*scope = NULL;
gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2198,7 +2439,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_free(tmp);
}
gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val);
- result = gpr_leftpad(buf, ' ', 40);
+ result = gpr_leftpad(buf, ' ', 60);
gpr_free(buf);
return result;
}
@@ -2231,7 +2472,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
tmp_phase = gpr_leftpad(phase, ' ', 8);
tmp_scope1 = gpr_leftpad(scope1, ' ', 11);
- gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1);
+ gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1);
gpr_free(tmp_phase);
gpr_free(tmp_scope1);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8d79e93ceb..e1dcf5262a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header {
void *arg;
} grpc_chttp2_executor_action_header;
+typedef enum {
+ /** no writing activity */
+ GRPC_CHTTP2_WRITING_INACTIVE,
+ /** write has been requested, but not scheduled yet */
+ GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
+ GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
+ /** write has been requested and scheduled against the workqueue */
+ GRPC_CHTTP2_WRITE_SCHEDULED,
+ /** write has been initiated after being reaped from the workqueue */
+ GRPC_CHTTP2_WRITING,
+ /** write has been initiated, AND another write needs to be started once it's
+ done */
+ GRPC_CHTTP2_WRITING_STALE_WITH_POLLER,
+ GRPC_CHTTP2_WRITING_STALE_NO_POLLER,
+} grpc_chttp2_write_state;
+
struct grpc_chttp2_transport {
grpc_transport base; /* must be first */
gpr_refcount refs;
@@ -319,10 +335,10 @@ struct grpc_chttp2_transport {
/** is a thread currently in the global lock */
bool global_active;
- /** is a thread currently writing */
- bool writing_active;
/** is a thread currently parsing */
bool parsing_active;
+ /** write execution state of the transport */
+ grpc_chttp2_write_state write_state;
grpc_chttp2_executor_action_header *pending_actions_head;
grpc_chttp2_executor_action_header *pending_actions_tail;
@@ -342,7 +358,8 @@ struct grpc_chttp2_transport {
/** global state for reading/writing */
grpc_chttp2_transport_global global;
/** state only accessible by the chain of execution that
- set writing_active=1 */
+ set writing_state >= GRPC_WRITING, and only by the writing closure
+ chain. */
grpc_chttp2_transport_writing writing;
/** state only accessible by the chain of execution that
set parsing_active=1 */
@@ -363,6 +380,8 @@ struct grpc_chttp2_transport {
grpc_closure reading_action;
/** closure to actually do parsing */
grpc_closure parsing_action;
+ /** closure to initiate writing */
+ grpc_closure initiate_writing;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -436,8 +455,10 @@ typedef struct {
bool seen_error;
bool exceeded_metadata_size;
- /** the error that resulted in this stream being removed */
- grpc_error *removal_error;
+ /** the error that resulted in this stream being read-closed */
+ grpc_error *read_closed_error;
+ /** the error that resulted in this stream being write-closed */
+ grpc_error *write_closed_error;
bool published_initial_metadata;
bool published_trailing_metadata;
@@ -514,15 +535,20 @@ struct grpc_chttp2_stream {
};
/** Transport writing call flow:
- chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes
- are required;
- if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the
- writes.
- Once writes have been completed (meaning another write could potentially be
- started),
- grpc_chttp2_terminate_writing is called. This will call
- grpc_chttp2_cleanup_writing, at which
- point the write phase is complete. */
+ grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
+ go out on the wire.
+ If no other write has been started, a task is enqueued onto our workqueue.
+ When that task executes, it obtains the global lock, and gathers the data
+ to write.
+ The global lock is dropped and we do the syscall to write.
+ After writing, a follow-up check is made to see if another round of writing
+ should be performed.
+
+ The actual call chain is documented in the implementation of this function.
+ */
+void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ bool covered_by_poller, const char *reason);
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
@@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available);
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing);
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
@@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
-void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
+void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ bool covered_by_poller, const char *reason);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 84eb5752f1..e1fc0ddee2 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads(
transport_parsing, outgoing_window);
is_zero = transport_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
- &stream_global)) {
- grpc_chttp2_become_writable(transport_global, stream_global);
- }
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "new_global_flow_control");
}
if (transport_parsing->incoming_window <
@@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads(
announce_incoming_window, announce_bytes);
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
incoming_window, announce_bytes);
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "global incoming window");
}
/* for each stream that saw an update, fixup global state */
@@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "stream.read_flow_control");
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 8f3ab00e6d..2eb5f5f632 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
+ gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id);
if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
}
@@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
-void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
- bool is_window_available) {
+bool grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream *stream;
+ bool out = false;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
- if (is_window_available) {
- grpc_chttp2_become_writable(&transport->global, &stream->global);
- } else {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- &stream->writing);
- }
+ gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled",
+ stream->global.id);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ &stream->writing);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
"chttp2_writing_stalled");
+ out = true;
}
+ return out;
}
void grpc_chttp2_list_add_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
+ gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id);
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index b19f5f068d..e0d87725e9 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
- bool is_window_available = transport_writing->outgoing_window > 0;
- grpc_chttp2_list_flush_writing_stalled_by_transport(
- exec_ctx, transport_writing, is_window_available);
+ if (transport_writing->outgoing_window > 0) {
+ while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
+ &stream_global)) {
+ grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global,
+ false, "transport.read_flow_control");
+ }
+ }
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
+ if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
+ transport_writing)) {
+ grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+ "resume_stalled_stream");
+ }
+
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index 1ab3733d38..f901fcf962 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
+
+grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
+ return ep->vtable->get_workqueue(ep);
+}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index f9808bbda1..894efc0b23 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -51,6 +51,7 @@ struct grpc_endpoint_vtable {
gpr_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
+ grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+/* Retrieve a reference to the workqueue associated with this endpoint */
+grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
+
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index cf0fe736a0..6a63c4d1d1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -57,6 +57,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -113,9 +114,7 @@ struct grpc_fd {
grpc_closure *read_closure;
grpc_closure *write_closure;
- /* The polling island to which this fd belongs to and the mutex protecting the
- the field */
- gpr_mu pi_mu;
+ /* The polling island to which this fd belongs to (protected by mu) */
struct polling_island *polling_island;
struct grpc_fd *freelist_next;
@@ -152,16 +151,17 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-// #define GRPC_PI_REF_COUNT_DEBUG
+//#define GRPC_PI_REF_COUNT_DEBUG
#ifdef GRPC_PI_REF_COUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
+#define PI_UNREF(exec_ctx, p, r) \
+ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
-#define PI_UNREF(p, r) pi_unref((p))
+#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
@@ -172,7 +172,7 @@ typedef struct polling_island {
Once the ref count becomes zero, this structure is destroyed which means
we should ensure that there is never a scenario where a PI_ADD_REF() is
racing with a PI_UNREF() that just made the ref_count zero. */
- gpr_refcount ref_count;
+ gpr_atm ref_count;
/* Pointer to the polling_island this merged into.
* merged_to value is only set once in polling_island's lifetime (and that too
@@ -184,6 +184,9 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
+ /* The workqueue associated with this polling island */
+ grpc_workqueue *workqueue;
+
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -191,11 +194,6 @@ typedef struct polling_island {
size_t fd_cnt;
size_t fd_capacity;
grpc_fd **fds;
-
- /* Polling islands that are no longer needed are kept in a freelist so that
- they can be reused. This field points to the next polling island in the
- free list */
- struct polling_island *next_free;
} polling_island;
/*******************************************************************************
@@ -253,13 +251,14 @@ struct grpc_pollset_set {
* Common helpers
*/
-static void append_error(grpc_error **composite, grpc_error *error,
+static bool append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
- if (error == GRPC_ERROR_NONE) return;
+ if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
+ return false;
}
/*******************************************************************************
@@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
-/* Polling island freelist */
-static gpr_mu g_pi_freelist_mu;
-static polling_island *g_pi_freelist = NULL;
-
-static void polling_island_delete(); /* Forward declaration */
+/* Forward declaration */
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -293,28 +289,35 @@ gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
#ifdef GRPC_PI_REF_COUNT_DEBUG
-void pi_add_ref(polling_island *pi);
-void pi_unref(polling_island *pi);
+static void pi_add_ref(polling_island *pi);
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
+static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
+ int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, old_cnt + 1, reason, file, line);
}
-void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
- pi_unref(pi);
+static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
+ char *reason, char *file, int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
+ pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
#endif
-void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
+static void pi_add_ref(polling_island *pi) {
+ gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
+}
-void pi_unref(polling_island *pi) {
- /* If ref count went to zero, delete the polling island.
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
+ /* If ref count went to one, we're back to just the workqueue owning a ref.
+ Unref the workqueue to break the loop.
+
+ If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
polling island (and that there is no racing pi_add_ref() call either).
@@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) {
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
- if (gpr_unref(&pi->ref_count)) {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(pi);
- if (next != NULL) {
- PI_UNREF(next, "pi_delete"); /* Recursive call */
+ switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+ case 2: /* last external ref: the only one now owned is by the workqueue */
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ break;
+ case 1: {
+ polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+ polling_island_delete(exec_ctx, pi);
+ if (next != NULL) {
+ PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+ }
+ break;
}
+ case 0:
+ GPR_UNREACHABLE_CODE(return );
}
}
@@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
}
/* Might return NULL in case of an error */
-static polling_island *polling_island_create(grpc_fd *initial_fd,
+static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
+ grpc_fd *initial_fd,
grpc_error **error) {
polling_island *pi = NULL;
- char *err_msg;
const char *err_desc = "polling_island_create";
- /* Try to get one from the polling island freelist */
- gpr_mu_lock(&g_pi_freelist_mu);
- if (g_pi_freelist != NULL) {
- pi = g_pi_freelist;
- g_pi_freelist = g_pi_freelist->next_free;
- pi->next_free = NULL;
- }
- gpr_mu_unlock(&g_pi_freelist_mu);
+ *error = GRPC_ERROR_NONE;
- /* Create new polling island if we could not get one from the free list */
- if (pi == NULL) {
- pi = gpr_malloc(sizeof(*pi));
- gpr_mu_init(&pi->mu);
- pi->fd_cnt = 0;
- pi->fd_capacity = 0;
- pi->fds = NULL;
- }
+ pi = gpr_malloc(sizeof(*pi));
+ gpr_mu_init(&pi->mu);
+ pi->fd_cnt = 0;
+ pi->fd_capacity = 0;
+ pi->fds = NULL;
+ pi->epoll_fd = -1;
+ pi->workqueue = NULL;
- gpr_ref_init(&pi->ref_count, 0);
+ gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
- gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
- strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- } else {
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
- pi->next_free = NULL;
+ append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
+ goto done;
+ }
- if (initial_fd != NULL) {
- /* Lock the polling island here just in case we got this structure from
- the freelist and the polling island lock was not released yet (by the
- code that adds the polling island to the freelist) */
- gpr_mu_lock(&pi->mu);
- polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- gpr_mu_unlock(&pi->mu);
- }
+ polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+
+ if (initial_fd != NULL) {
+ polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
+ }
+
+ if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
+ err_desc) &&
+ *error == GRPC_ERROR_NONE) {
+ polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
+ error);
+ GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
+ pi->workqueue->wakeup_read_fd->polling_island = pi;
+ PI_ADD_REF(pi, "fd");
}
+done:
+ if (*error != GRPC_ERROR_NONE) {
+ if (pi->workqueue != NULL) {
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ }
+ polling_island_delete(exec_ctx, pi);
+ pi = NULL;
+ }
return pi;
}
-static void polling_island_delete(polling_island *pi) {
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
GPR_ASSERT(pi->fd_cnt == 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
-
- close(pi->epoll_fd);
- pi->epoll_fd = -1;
-
- gpr_mu_lock(&g_pi_freelist_mu);
- pi->next_free = g_pi_freelist;
- g_pi_freelist = pi;
- gpr_mu_unlock(&g_pi_freelist_mu);
+ if (pi->epoll_fd >= 0) {
+ close(pi->epoll_fd);
+ }
+ gpr_mu_destroy(&pi->mu);
+ gpr_free(pi->fds);
+ gpr_free(pi);
}
/* Attempts to gets the last polling island in the linked list (liked by the
@@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p,
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_init(&g_pi_freelist_mu);
- g_pi_freelist = NULL;
-
error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
if (error == GRPC_ERROR_NONE) {
error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
@@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() {
}
static void polling_island_global_shutdown() {
- polling_island *next;
- gpr_mu_lock(&g_pi_freelist_mu);
- gpr_mu_unlock(&g_pi_freelist_mu);
- while (g_pi_freelist != NULL) {
- next = g_pi_freelist->next_free;
- gpr_mu_destroy(&g_pi_freelist->mu);
- gpr_free(g_pi_freelist->fds);
- gpr_free(g_pi_freelist);
- g_pi_freelist = next;
- }
- gpr_mu_destroy(&g_pi_freelist_mu);
-
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
@@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu);
- gpr_mu_init(&new_fd->pi_mu);
}
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a
@@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
+ polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->mu);
fd->on_done_closure = on_done;
@@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island
before doing this.) */
- gpr_mu_lock(&fd->pi_mu);
if (fd->polling_island != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
- PI_UNREF(fd->polling_island, "fd_orphan");
+ unref_pi = fd->polling_island;
fd->polling_island = NULL;
}
- gpr_mu_unlock(&fd->pi_mu);
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
+ if (unref_pi != NULL) {
+ /* Unref stale polling island here, outside the fd lock above.
+ The polling island owns a workqueue which owns an fd, and unreffing
+ inside the lock can cause an eventual lock loop that makes TSAN very
+ unhappy. */
+ PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
+ }
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
}
@@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ grpc_workqueue *workqueue = NULL;
+ if (fd->polling_island != NULL) {
+ workqueue =
+ GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
+ }
+ gpr_mu_unlock(&fd->mu);
+ return workqueue;
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
-static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
+static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) {
- PI_UNREF(ps->polling_island, reason);
+ PI_UNREF(exec_ctx, ps->polling_island, reason);
}
ps->polling_island = NULL;
}
@@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */
- pollset_release_polling_island(pollset, "ps_shutdown");
+ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
@@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
- pollset_release_polling_island(pollset, "ps_reset");
+ GPR_ASSERT(pollset->polling_island == NULL);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
@@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(NULL, error);
+ pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
@@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
pollset->polling_island = pi;
}
@@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
- PI_UNREF(pi, "ps_work");
+ PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
@@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
polling_island *pi_new = NULL;
+retry:
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
* equal, do nothing.
* 2) If fd->polling_island and pollset->polling_island are both NULL, create
@@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* polling_island fields in both fd and pollset to point to the merged
* polling island.
*/
+
+ if (fd->orphaned) {
+ gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&pollset->mu);
+ /* early out */
+ return;
+ }
+
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
- pi_new = polling_island_create(fd, &error);
-
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
- "pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point where
+ that loop possibility disappears), but the advantages of keeping TSAN
+ happy outweigh any performance advantage we might have by keeping the
+ lock held. */
+ gpr_mu_unlock(&fd->mu);
+ pi_new = polling_island_create(exec_ctx, fd, &error);
+ gpr_mu_lock(&fd->mu);
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
+ if (fd->polling_island != NULL) {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Raced creating new polling island. pi_new: %p "
+ "(fd: %d, pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ PI_ADD_REF(pi_new, "dance_of_destruction");
+ PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ goto retry;
+ } else {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
+ "pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ }
}
} else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island);
@@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
- PI_UNREF(fd->polling_island, "fd");
+ PI_UNREF(exec_ctx, fd->polling_island, "fd");
}
fd->polling_island = pi_new;
}
@@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->polling_island != pi_new) {
PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) {
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
}
pollset->polling_island = pi_new;
}
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
gpr_mu_unlock(&pollset->mu);
+
+ GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
/*******************************************************************************
@@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi;
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
pi = fd->polling_island;
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
return pi;
}
@@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 9e306af5fa..c2107e5e39 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 45c0a5e954..4b593f4b2c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index a3c1e9db9a..6536672685 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
+ return g_event_engine->fd_get_workqueue(fd);
+}
+
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 579c84ef70..c2aa1756ea 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
+ grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
+/* Get a workqueue that's associated with this fd */
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
+
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index c44aafcddf..ac7785ec13 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
@@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ if (offload_target_or_null == NULL) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ } else {
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ }
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 38f27d9b13..917f332f03 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed at the next flush/finish point */
+/** Add a closure to be executed in the future.
+ If \a offload_target_or_null is NULL, the closure will be executed at the
+ next exec_ctx.{finish,flush} point.
+ If \a offload_target_or_null is non-NULL, the closure will be scheduled
+ against the workqueue, and a reference to the workqueue will be consumed. */
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null);
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 89292a153e..d67d388b8c 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
@@ -62,6 +63,7 @@ void grpc_iomgr_init(void) {
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
+ grpc_network_status_init();
grpc_iomgr_platform_init();
}
@@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_platform_shutdown();
grpc_exec_ctx_global_shutdown();
+ grpc_network_status_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index ccbe136db9..90c074b007 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -42,9 +42,8 @@ typedef struct endpoint_ll_node {
static endpoint_ll_node *head = NULL;
static gpr_mu g_endpoint_mutex;
-static gpr_once g_once_init = GPR_ONCE_INIT;
-static void destroy_network_status_monitor(void) {
+void grpc_network_status_shutdown(void) {
if (head != NULL) {
gpr_log(GPR_ERROR,
"Memory leaked as all network endpoints were not shut down");
@@ -52,14 +51,12 @@ static void destroy_network_status_monitor(void) {
gpr_mu_destroy(&g_endpoint_mutex);
}
-static void initialize_network_status_monitor(void) {
+void grpc_network_status_init(void) {
gpr_mu_init(&g_endpoint_mutex);
- atexit(destroy_network_status_monitor);
// TODO(makarandd): Install callback with OS to monitor network status.
}
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
- gpr_once_init(&g_once_init, initialize_network_status_monitor);
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h
index 74a1aa8135..67cb645f44 100644
--- a/src/core/lib/iomgr/network_status_tracker.h
+++ b/src/core/lib/iomgr/network_status_tracker.h
@@ -35,7 +35,11 @@
#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
#include "src/core/lib/iomgr/endpoint.h"
+void grpc_network_status_init(void);
+void grpc_network_status_shutdown(void);
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep);
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep);
void grpc_network_status_shutdown_all_endpoints();
+
#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2ab45e33ce..ec21e03944 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 16
+#define MAX_WRITE_IOVEC 1024
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
@@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
- tcp_shutdown, tcp_destroy, tcp_get_peer};
+static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return grpc_fd_get_workqueue(tcp->em_fd);
+}
+
+static const grpc_endpoint_vtable vtable = {tcp_read,
+ tcp_write,
+ tcp_get_workqueue,
+ tcp_add_to_pollset,
+ tcp_add_to_pollset_set,
+ tcp_shutdown,
+ tcp_destroy,
+ tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index d3803c3bd0..cb2ff782d6 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
}
for (unsigned i = 0; i < count; i++) {
- int fd, port;
+ int fd = -1;
+ int port = -1;
grpc_dualstack_mode dsmode;
err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
&dsmode, &fd);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 37ab59021e..35054c42b5 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_endpoint_vtable vtable = {
- win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
+static grpc_endpoint_vtable vtable = {win_read,
+ win_write,
+ win_get_workqueue,
+ win_add_to_pollset,
+ win_add_to_pollset_set,
+ win_shutdown,
+ win_destroy,
+ win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 5cc40eea50..7156e490d7 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -38,6 +38,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/workqueue_posix.h"
@@ -49,35 +50,45 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-/** Create a work queue */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
+/* Deprecated: do not use.
+ This has *already* been removed in a future commit. */
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
+/* Reference counting functions. Use the macro's always
+ (GRPC_WORKQUEUE_{REF,UNREF}).
+
+ Pass in a descriptive reason string for reffing/unreffing as the last
+ argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
+ string will be printed alongside the refcount. When it is not defined, the
+ string will be discarded at compilation time. */
+
+//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
- grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) \
- grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r))
+ (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
+#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
+ grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
-#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
+#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Bind this workqueue to a pollset */
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset);
+/** Add a work item to a workqueue. Items added to a work queue will be started
+ in approximately the order they were enqueued, on some thread that may or
+ may not be the current thread. Successive closures enqueued onto a workqueue
+ MAY be executed concurrently.
+
+ It is generally more expensive to add a closure to a workqueue than to the
+ execution context, both in terms of CPU work and in execution latency.
-/** Add a work item to a workqueue */
+ Use work queues when it's important that other threads be given a chance to
+ tackle some workload. */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 45e0f6063b..e0d6dac230 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
- GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list));
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
}
@@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
}
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index dcb47e7b59..0f26ba58e2 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -50,4 +50,9 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
+/** Create a work queue. Returns an error if creation fails. If creation
+ succeeds, sets *workqueue to point to it. */
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
+
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 275f040b1c..23e2dea185 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -37,4 +37,26 @@
#include "src/core/lib/iomgr/workqueue.h"
+// Minimal implementation of grpc_workqueue for Windows
+// Works by directly enqueuing workqueue items onto the current execution
+// context, which is at least correct, if not performant or in the spirit of
+// workqueues.
+
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 7650d68e89..bc50f9d1b0 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
-static const grpc_endpoint_vtable vtable = {
- endpoint_read, endpoint_write,
- endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_destroy,
- endpoint_get_peer};
+static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
+ secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ return grpc_endpoint_get_workqueue(ep->wrapped_ep);
+}
+
+static const grpc_endpoint_vtable vtable = {endpoint_read,
+ endpoint_write,
+ endpoint_get_workqueue,
+ endpoint_add_to_pollset,
+ endpoint_add_to_pollset_set,
+ endpoint_shutdown,
+ endpoint_destroy,
+ endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b..2f108af48a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
+ size_t cq_idx;
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
@@ -206,11 +207,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls indices */
- gpr_stack_lockfree *request_freelist;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
- requested_call *requested_calls;
- size_t max_requested_calls;
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) {
- fail_call(exec_ctx, server, i, &server->requested_calls[request_id],
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error));
}
}
@@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
- gpr_stack_lockfree_destroy(server->request_freelist);
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
- gpr_free(server->requested_calls);
gpr_free(server);
}
@@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
requested_call *rc = req;
grpc_server *server = rc->server;
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}
@@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue(
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- size_t i;
-
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
&server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls = 32768;
- server->request_freelist =
- gpr_stack_lockfree_create(server->max_requested_calls);
- for (i = 0; i < (size_t)server->max_requested_calls; i++) {
- gpr_stack_lockfree_push(server->request_freelist, (int)i);
- }
- server->requested_calls = gpr_malloc(server->max_requested_calls *
- sizeof(*server->requested_calls));
-
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
size_t pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server->max_requested_calls,
- server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
for (l = server->listeners; l; l = l->next) {
@@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_CREATE("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist);
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
- GRPC_ERROR_CREATE("Server Shutdown"));
+ grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT,
+ server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- server->requested_calls[request_id] = *rc;
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
@@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
+ rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
@@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call(
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
+ rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 054f112127..68d05e3a85 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
+ if (grpc_connectivity_state_trace) {
+ gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify);
+ }
grpc_exec_ctx_sched(exec_ctx, w->notify,
GRPC_ERROR_REF(tracker->current_error), NULL);
gpr_free(w);
diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c
index 50e3c9cb89..db45f5eb5a 100644
--- a/test/core/end2end/tests/high_initial_seqno.c
+++ b/test/core/end2end/tests/high_initial_seqno.c
@@ -203,6 +203,12 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
grpc_call_destroy(c);
grpc_call_destroy(s);
+ /* TODO(ctiller): this rate limits the test, and it should be removed when
+ retry has been implemented; until then cross-thread chatter
+ may result in some requests needing to be cancelled due to
+ seqno exhaustion. */
+ cq_verify_empty(cqv);
+
cq_verifier_destroy(cqv);
}
diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c
index 10207844ab..39ddc13754 100644
--- a/test/core/end2end/tests/network_status_change.c
+++ b/test/core/end2end/tests/network_status_change.c
@@ -186,9 +186,10 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == error);
cq_expect_completion(cqv, tag(102), 1);
+ cq_verify(cqv);
+
// Simulate the network loss event
grpc_network_status_shutdown_all_endpoints();
- cq_verify(cqv);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
@@ -205,7 +206,7 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) {
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
- void shutdown_all_endpoints();
+
cq_expect_completion(cqv, tag(103), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c
index 5e86c42309..27d630623e 100644
--- a/test/core/internal_api_canaries/iomgr.c
+++ b/test/core/internal_api_canaries/iomgr.c
@@ -77,11 +77,14 @@ static void test_code(void) {
/* endpoint.h */
grpc_endpoint endpoint;
- grpc_endpoint_vtable vtable = {
- grpc_endpoint_read, grpc_endpoint_write,
- grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set,
- grpc_endpoint_shutdown, grpc_endpoint_destroy,
- grpc_endpoint_get_peer};
+ grpc_endpoint_vtable vtable = {grpc_endpoint_read,
+ grpc_endpoint_write,
+ grpc_endpoint_get_workqueue,
+ grpc_endpoint_add_to_pollset,
+ grpc_endpoint_add_to_pollset_set,
+ grpc_endpoint_shutdown,
+ grpc_endpoint_destroy,
+ grpc_endpoint_get_peer};
endpoint.vtable = &vtable;
grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);
diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c
deleted file mode 100644
index 76ecfae74b..0000000000
--- a/test/core/iomgr/workqueue_test.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "test/core/util/test_config.h"
-
-static gpr_mu *g_mu;
-static grpc_pollset *g_pollset;
-
-static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- gpr_mu_lock(g_mu);
- *(int *)p = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
- gpr_mu_unlock(g_mu);
-}
-
-static void test_ref_unref(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- GRPC_WORKQUEUE_REF(wq, "test");
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test");
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_add_closure(void) {
- grpc_closure c;
- int done = 0;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- grpc_pollset_worker *worker = NULL;
- grpc_closure_init(&c, must_succeed, &done);
-
- grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE);
- grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset);
-
- gpr_mu_lock(g_mu);
- GPR_ASSERT(!done);
- while (!done) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(deadline.clock_type), deadline)));
- }
- gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(done);
-
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_flush(void) {
- grpc_closure c;
- int done = 0;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- grpc_pollset_worker *worker = NULL;
- grpc_closure_init(&c, must_succeed, &done);
-
- grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL);
- grpc_workqueue_flush(&exec_ctx, wq);
- grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset);
-
- gpr_mu_lock(g_mu);
- GPR_ASSERT(!done);
- while (!done) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(deadline.clock_type), deadline)));
- }
- gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(done);
-
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
- grpc_error *error) {
- grpc_pollset_destroy(p);
-}
-
-int main(int argc, char **argv) {
- grpc_closure destroyed;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_test_init(argc, argv);
- grpc_init();
- g_pollset = gpr_malloc(grpc_pollset_size());
- grpc_pollset_init(g_pollset, &g_mu);
-
- test_ref_unref();
- test_add_closure();
- test_flush();
-
- grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
- grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
- grpc_exec_ctx_finish(&exec_ctx);
- grpc_shutdown();
-
- gpr_free(g_pollset);
- return 0;
-}
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index ed9545e9df..13e0e918fb 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
+static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_peer,
+ me_read,
+ me_write,
+ me_get_workqueue,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_peer,
};
grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) {
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index a39f3dd66e..7ed9e97bd6 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
+static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_peer,
+ me_read,
+ me_write,
+ me_get_workqueue,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_peer,
};
static void half_init(half *m, passthru_endpoint *parent) {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 354a59cedd..0f87ae3e44 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) {
request.mutable_param()->set_response_message_length(kResponseSize);
ClientContext context;
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::now() + std::chrono::seconds(20);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(kResponseSize, response.message().size());
EXPECT_TRUE(s.ok());
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 1507d1e3d6..5dd0bd8533 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -206,21 +206,30 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (!ctx->RunNextState(ok, histogram)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- auto clone = ctx->StartNewClone();
- clone->Start(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
+ switch (cli_cqs_[thread_idx]->AsyncNext(
+ &got_tag, &ok,
+ std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
+ case CompletionQueue::SHUTDOWN:
+ return false;
+ case CompletionQueue::GOT_EVENT: {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ if (!ctx->RunNextState(ok, histogram)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ auto clone = ctx->StartNewClone();
+ clone->Start(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
+ }
+ return true;
}
- return true;
- } else { // queue is shutting down
- return false;
+ case CompletionQueue::TIMEOUT:
+ // TODO(ctiller): do something here to track how frequently we pass
+ // through this codepath.
+ return true;
}
+ GPR_UNREACHABLE_CODE(return false);
}
protected:
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 34b8151441..4ff4e44b8b 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config
def _scenario_json_string(scenario_json):
# tweak parameters to get fast test times
- scenario_json['warmup_seconds'] = 1
+ scenario_json['warmup_seconds'] = 0
scenario_json['benchmark_seconds'] = 1
- return json.dumps(scenario_config.remove_nonproto_fields(scenario_json))
+ scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]}
+ return json.dumps(scenarios_json)
def threads_of_type(scenario_json, path):
d = scenario_json
@@ -72,8 +73,7 @@ print yaml.dump({
{
'name': 'json_run_localhost',
'shortname': 'json_run_localhost:%s' % scenario_json['name'],
- 'args': ['--scenario_json',
- pipes.quote(_scenario_json_string(scenario_json))],
+ 'args': ['--scenarios_json', _scenario_json_string(scenario_json)],
'ci_platforms': ['linux', 'mac', 'posix', 'windows'],
'platforms': ['linux', 'mac', 'posix', 'windows'],
'flaky': False,
@@ -81,7 +81,8 @@ print yaml.dump({
'boringssl': True,
'defaults': 'boringssl',
'cpu_cost': guess_cpu(scenario_json),
- 'exclude_configs': []
+ 'exclude_configs': [],
+ 'timeout_seconds': 3*60
}
for scenario_json in scenario_config.CXXLanguage().scenarios()
]
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 6545dc2917..74e40fbf1a 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -75,7 +75,7 @@ int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
- SubProcess(args).Join();
+ GPR_ASSERT(SubProcess(args).Join() == 0);
for (auto it = jobs.begin(); it != jobs.end(); ++it) {
(*it)->Interrupt();
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c9954d0d02..73ca19148b 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server {
auto process_rpc_bound =
std::bind(process_rpc, config.payload_config(), _1, _2);
- for (int i = 0; i < 10000 / num_threads; i++) {
+ for (int i = 0; i < 15000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary =
@@ -132,7 +132,8 @@ class AsyncQpsServerTest : public Server {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
(*ss)->set_shutdown();
}
- server_->Shutdown();
+ server_->Shutdown(std::chrono::system_clock::now() +
+ std::chrono::seconds(3));
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
diff --git a/tools/dockerfile/test/python_pyenv_x64/Dockerfile b/tools/dockerfile/test/python_pyenv_x64/Dockerfile
index e2355c4443..abb5f3c89b 100644
--- a/tools/dockerfile/test/python_pyenv_x64/Dockerfile
+++ b/tools/dockerfile/test/python_pyenv_x64/Dockerfile
@@ -76,7 +76,7 @@ RUN apt-get update && apt-get install -y \
# Install Python packages from PyPI
RUN pip install pip --upgrade
RUN pip install virtualenv
-RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0a2
+RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0a2 six==1.10.0
# Install dependencies for pyenv
RUN apt-get update && apt-get install -y \
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 59d2cad856..57fff2ec9c 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -41,6 +41,7 @@ import json
import multiprocessing
import os
import os.path
+import pipes
import platform
import random
import re
@@ -74,6 +75,9 @@ def platform_string():
return jobset.platform_string()
+_DEFAULT_TIMEOUT_SECONDS = 5 * 60
+
+
# SimpleConfig: just compile with CONFIG=config, and run the binary to test
class Config(object):
@@ -86,7 +90,7 @@ class Config(object):
self.tool_prefix = tool_prefix
self.timeout_multiplier = timeout_multiplier
- def job_spec(self, cmdline, timeout_seconds=5*60,
+ def job_spec(self, cmdline, timeout_seconds=_DEFAULT_TIMEOUT_SECONDS,
shortname=None, environ={}, cpu_cost=1.0, flaky=False):
"""Construct a jobset.JobSpec for a test under this config
@@ -161,7 +165,7 @@ class CLanguage(object):
env={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
_ROOT + '/src/core/lib/tsi/test_creds/ca.pem',
'GRPC_POLL_STRATEGY': polling_strategy}
- shortname_ext = '' if polling_strategy=='all' else ' polling=%s' % polling_strategy
+ shortname_ext = '' if polling_strategy=='all' else ' GRPC_POLL_STRATEGY=%s' % polling_strategy
if self.config.build_config in target['exclude_configs']:
continue
if self.platform == 'windows':
@@ -192,28 +196,26 @@ class CLanguage(object):
assert line[1] == ' '
test = base + line.strip()
cmdline = [binary] + ['--gtest_filter=%s' % test]
- out.append(self.config.job_spec(cmdline, [binary],
- shortname='%s:%s %s' % (binary, test, shortname_ext),
+ out.append(self.config.job_spec(cmdline,
+ shortname='%s --gtest_filter=%s %s' % (binary, test, shortname_ext),
cpu_cost=target['cpu_cost'],
environ=env))
else:
cmdline = [binary] + target['args']
- out.append(self.config.job_spec(cmdline, [binary],
- shortname=' '.join(cmdline) + shortname_ext,
+ out.append(self.config.job_spec(cmdline,
+ shortname=' '.join(
+ pipes.quote(arg)
+ for arg in cmdline) +
+ shortname_ext,
cpu_cost=target['cpu_cost'],
flaky=target.get('flaky', False),
+ timeout_seconds=target.get('timeout_seconds', _DEFAULT_TIMEOUT_SECONDS),
environ=env))
elif self.args.regex == '.*' or self.platform == 'windows':
print('\nWARNING: binary not found, skipping', binary)
return sorted(out)
def make_targets(self):
- test_regex = self.args.regex
- if self.platform != 'windows' and self.args.regex != '.*':
- # use the regex to minimize the number of things to build
- return [os.path.basename(target['name'])
- for target in get_c_tests(False, self.test_lang)
- if re.search(test_regex, '/' + target['name'])]
if self.platform == 'windows':
# don't build tools on windows just yet
return ['buildtests_%s' % self.make_target]
@@ -1294,8 +1296,6 @@ def _build_and_run(
jobset.message(
'FLAKE', '%s [%d/%d runs flaked]' % (k, num_failures, num_runs),
do_newline=True)
- else:
- jobset.message('PASSED', k, do_newline=True)
finally:
for antagonist in antagonists:
antagonist.kill()
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index cdbc254f43..e3cfd55cd6 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -1823,22 +1823,6 @@
"gpr",
"gpr_test_util",
"grpc",
- "grpc_test_util"
- ],
- "headers": [],
- "language": "c",
- "name": "workqueue_test",
- "src": [
- "test/core/iomgr/workqueue_test.c"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "gpr",
- "gpr_test_util",
- "grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 93d42e3454..d94301b946 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -1940,25 +1940,6 @@
"ci_platforms": [
"linux",
"mac",
- "posix"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "flaky": false,
- "gtest": false,
- "language": "c",
- "name": "workqueue_test",
- "platforms": [
- "linux",
- "mac",
- "posix"
- ]
- },
- {
- "args": [],
- "ci_platforms": [
- "linux",
- "mac",
"posix",
"windows"
],
@@ -27153,8 +27134,8 @@
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27175,12 +27156,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27201,12 +27183,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27227,12 +27210,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27253,12 +27237,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure"
+ "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27279,12 +27264,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27305,12 +27291,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27331,12 +27318,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27357,12 +27345,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27383,12 +27372,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27409,12 +27399,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27435,12 +27426,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27461,12 +27453,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure"
+ "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27487,12 +27480,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27513,12 +27507,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure"
+ "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27539,12 +27534,13 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure",
+ "timeout_seconds": 180
},
{
"args": [
- "--scenario_json",
- "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'"
+ "--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
],
"boringssl": true,
"ci_platforms": [
@@ -27565,7 +27561,8 @@
"posix",
"windows"
],
- "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure"
+ "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure",
+ "timeout_seconds": 180
},
{
"args": [