aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.cc5
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.cc8
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc107
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/client_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/server_fuzzer.cc4
-rw-r--r--test/core/iomgr/udp_server_test.cc16
-rw-r--r--test/core/security/ssl_server_fuzzer.cc5
-rw-r--r--test/core/transport/chttp2/BUILD15
-rw-r--r--test/core/transport/chttp2/settings_timeout_test.cc258
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc40
-rw-r--r--test/cpp/interop/interop_server.cc18
-rw-r--r--test/cpp/interop/server_helper.h20
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc2
-rw-r--r--test/cpp/microbenchmarks/fullstack_fixtures.h10
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc12
-rw-r--r--test/cpp/qps/client_sync.cc212
-rw-r--r--test/distrib/cpp/run_distrib_test_cmake.bat75
-rwxr-xr-xtest/distrib/cpp/run_distrib_test_cmake.sh1
21 files changed, 664 insertions, 168 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index 443886751c..d8bb092e14 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -115,9 +115,10 @@ void grpc_run_bad_client_test(
GRPC_BAD_CLIENT_REGISTERED_HOST,
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
grpc_server_start(a.server);
- transport = grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, 0);
+ transport =
+ grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, false);
server_setup_transport(&a, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
/* Bind everything into the same pollset */
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc
index fe4ab74843..9319c401dc 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.cc
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc
@@ -97,10 +97,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
- grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+ grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -114,9 +114,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
- grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+ grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc
index 59a3a6db31..03566aada2 100644
--- a/test/core/end2end/fixtures/h2_sockpair.cc
+++ b/test/core/end2end/fixtures/h2_sockpair.cc
@@ -91,10 +91,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
- grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+ grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
- grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+ grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
index d935efbbac..9adba00204 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
@@ -102,10 +102,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
- grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+ grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -119,9 +119,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
- grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+ grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index 3f7806942b..3904887026 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy {
// Connection handling
//
+// proxy_connection structure is only accessed in the closures which are all
+// scheduled under the same combiner lock. So there is is no need for a mutex to
+// protect this structure.
typedef struct proxy_connection {
grpc_end2end_http_proxy* proxy;
@@ -78,6 +81,8 @@ typedef struct proxy_connection {
grpc_pollset_set* pollset_set;
+ // NOTE: All the closures execute under proxy->combiner lock. Which means
+ // there will not be any data-races between the closures
grpc_closure on_read_request_done;
grpc_closure on_server_connect_done;
grpc_closure on_write_response_done;
@@ -86,6 +91,13 @@ typedef struct proxy_connection {
grpc_closure on_server_read_done;
grpc_closure on_server_write_done;
+ bool client_read_failed : 1;
+ bool client_write_failed : 1;
+ bool client_shutdown : 1;
+ bool server_read_failed : 1;
+ bool server_write_failed : 1;
+ bool server_shutdown : 1;
+
grpc_slice_buffer client_read_buffer;
grpc_slice_buffer client_deferred_write_buffer;
bool client_is_writing;
@@ -129,21 +141,53 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
}
}
+enum failure_type {
+ SETUP_FAILED, // To be used before we start proxying.
+ CLIENT_READ_FAILED,
+ CLIENT_WRITE_FAILED,
+ SERVER_READ_FAILED,
+ SERVER_WRITE_FAILED,
+};
+
// Helper function to shut down the proxy connection.
-// Does NOT take ownership of a reference to error.
static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
- proxy_connection* conn, bool is_client,
- const char* prefix, grpc_error* error) {
- const char* msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "%s: %s", prefix, msg);
-
- grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
- GRPC_ERROR_REF(error));
- if (conn->server_endpoint != nullptr) {
+ proxy_connection* conn,
+ failure_type failure, const char* prefix,
+ grpc_error* error) {
+ gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
+ // Decide whether we should shut down the client and server.
+ bool shutdown_client = false;
+ bool shutdown_server = false;
+ if (failure == SETUP_FAILED) {
+ shutdown_client = true;
+ shutdown_server = true;
+ } else {
+ if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
+ (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
+ (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
+ shutdown_client = true;
+ }
+ if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
+ (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
+ (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
+ shutdown_server = true;
+ }
+ }
+ // If we decided to shut down either one and have not yet done so, do so.
+ if (shutdown_client && !conn->client_shutdown) {
+ grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
+ GRPC_ERROR_REF(error));
+ conn->client_shutdown = true;
+ }
+ if (shutdown_server && !conn->server_shutdown &&
+ (conn->server_endpoint != nullptr)) {
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
GRPC_ERROR_REF(error));
+ conn->server_shutdown = true;
}
+ // Unref the connection.
proxy_connection_unref(exec_ctx, conn, "conn_failed");
+ GRPC_ERROR_UNREF(error);
}
// Callback for writing proxy data to the client.
@@ -152,8 +196,8 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy client write", error);
+ proxy_connection_failed(exec_ctx, conn, CLIENT_WRITE_FAILED,
+ "HTTP proxy client write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -179,8 +223,8 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg;
conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, false /* is_client */,
- "HTTP proxy server write", error);
+ proxy_connection_failed(exec_ctx, conn, SERVER_WRITE_FAILED,
+ "HTTP proxy server write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -206,8 +250,8 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy client read", error);
+ proxy_connection_failed(exec_ctx, conn, CLIENT_READ_FAILED,
+ "HTTP proxy client read", GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., server_write_buffer is
@@ -239,8 +283,8 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, false /* is_client */,
- "HTTP proxy server read", error);
+ proxy_connection_failed(exec_ctx, conn, SERVER_READ_FAILED,
+ "HTTP proxy server read", GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., client_write_buffer is
@@ -272,8 +316,8 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg,
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy write response", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy write response", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer.
@@ -301,8 +345,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
// connection failed. However, for the purposes of this test code,
// it's fine to pretend this is a client-side error, which will
// cause the client connection to be dropped.
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy server connect", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy server connect", GRPC_ERROR_REF(error));
return;
}
// We've established a connection, so send back a 200 response code to
@@ -351,8 +395,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
grpc_error_string(error));
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy read request", GRPC_ERROR_REF(error));
return;
}
// Read request and feed it to the parser.
@@ -361,8 +405,9 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
error = grpc_http_parser_parse(
&conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy request parse", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy request parse",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -382,8 +427,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
conn->http_request.method);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy read request", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -403,8 +448,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
if (!client_authenticated) {
const char* msg = "HTTP Connect could not verify authentication";
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy read request", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -414,8 +459,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
error = grpc_blocking_resolve_address(conn->http_request.path, "80",
&resolved_addresses);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(exec_ctx, conn, true /* is_client */,
- "HTTP proxy DNS lookup", error);
+ proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+ "HTTP proxy DNS lookup", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc
index 0b49b89205..75117218e4 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.cc
+++ b/test/core/end2end/fuzzers/api_fuzzer.cc
@@ -468,10 +468,10 @@ static void do_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
*fc->ep = client;
grpc_transport* transport =
- grpc_create_chttp2_transport(exec_ctx, nullptr, server, 0);
+ grpc_create_chttp2_transport(exec_ctx, nullptr, server, false);
grpc_server_setup_transport(exec_ctx, g_server, transport, nullptr,
nullptr);
- grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr);
GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE);
} else {
diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc
index ebc8c2780d..5871f0f43e 100644
--- a/test/core/end2end/fuzzers/client_fuzzer.cc
+++ b/test/core/end2end/fuzzers/client_fuzzer.cc
@@ -54,8 +54,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_transport* transport =
- grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 1);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, true);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport);
diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc
index fb6477b579..67caf4e720 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.cc
+++ b/test/core/end2end/fuzzers/server_fuzzer.cc
@@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// grpc_server_register_method(server, "/reg", NULL, 0);
grpc_server_start(server);
grpc_transport* transport =
- grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 0);
+ grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, false);
grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, nullptr);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_call* call1 = nullptr;
grpc_call_details call_details1;
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 803f017106..6e17be9cd6 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@ static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
-static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
char read_buffer[512];
ssize_t byte_count;
@@ -64,9 +64,11 @@ static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
+ return false;
}
-static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data,
+ grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
@@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
grpc_closure* closure, void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
@@ -226,7 +229,6 @@ static void test_receive(int number_of_clients) {
int clifd, svrfd;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
int i;
- int number_of_reads_before;
grpc_millis deadline;
grpc_pollset* pollsets[1];
LOG_TEST("test_receive");
@@ -256,14 +258,14 @@ static void test_receive(int number_of_clients) {
deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
- number_of_reads_before = g_number_of_reads;
+ int number_of_bytes_read_before = g_number_of_bytes_read;
/* Create a socket, send a packet to the UDP server. */
clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
GPR_ASSERT(clifd >= 0);
GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
(socklen_t)resolved_addr.len) == 0);
GPR_ASSERT(5 == write(clifd, "hello", 5));
- while (g_number_of_reads == number_of_reads_before &&
+ while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
deadline > grpc_exec_ctx_now(&exec_ctx)) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -273,7 +275,6 @@ static void test_receive(int number_of_clients) {
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
- GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd);
}
GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@@ -286,9 +287,6 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
-
- /* The write callback should have fired a few times. */
- GPR_ASSERT(g_number_of_writes > 0);
}
static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc
index bbb2f6013e..d83ebb18d2 100644
--- a/test/core/security/ssl_server_fuzzer.cc
+++ b/test/core/security/ssl_server_fuzzer.cc
@@ -92,8 +92,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_server_security_connector_add_handshakers(&exec_ctx, sc, handshake_mgr);
grpc_handshake_manager_do_handshake(
- &exec_ctx, handshake_mgr, mock_endpoint, nullptr /* channel_args */,
- deadline, nullptr /* acceptor */, on_handshake_done, &state);
+ &exec_ctx, handshake_mgr, nullptr /* interested_parties */, mock_endpoint,
+ nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+ on_handshake_done, &state);
grpc_exec_ctx_flush(&exec_ctx);
// If the given string happens to be part of the correct client hello, the
diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD
index 1ea7d0341d..6eff716b01 100644
--- a/test/core/transport/chttp2/BUILD
+++ b/test/core/transport/chttp2/BUILD
@@ -115,6 +115,21 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "settings_timeout_test",
+ srcs = ["settings_timeout_test.cc"],
+ language = "C++",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+ external_deps = [
+ "gtest",
+ ],
+)
+
+grpc_cc_test(
name = "varint_test",
srcs = ["varint_test.cc"],
language = "C++",
diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc
new file mode 100644
index 0000000000..670eae1f79
--- /dev/null
+++ b/test/core/transport/chttp2/settings_timeout_test.cc
@@ -0,0 +1,258 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include <memory>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace test {
+namespace {
+
+// A gRPC server, running in its own thread.
+class ServerThread {
+ public:
+ explicit ServerThread(const char* address) : address_(address) {}
+
+ void Start() {
+ // Start server with 1-second handshake timeout.
+ grpc_arg arg;
+ arg.type = GRPC_ARG_INTEGER;
+ arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+ arg.value.integer = 1000;
+ grpc_channel_args args = {1, &arg};
+ server_ = grpc_server_create(&args, nullptr);
+ ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
+ cq_ = grpc_completion_queue_create_for_next(nullptr);
+ grpc_server_register_completion_queue(server_, cq_, nullptr);
+ grpc_server_start(server_);
+ thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
+ }
+
+ void Shutdown() {
+ grpc_completion_queue* shutdown_cq =
+ grpc_completion_queue_create_for_pluck(nullptr);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
+ GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
+ grpc_timeout_seconds_to_deadline(1),
+ nullptr)
+ .type == GRPC_OP_COMPLETE);
+ grpc_completion_queue_destroy(shutdown_cq);
+ grpc_server_destroy(server_);
+ grpc_completion_queue_destroy(cq_);
+ thread_->join();
+ }
+
+ private:
+ void Serve() {
+ // The completion queue should not return anything other than shutdown.
+ grpc_event ev = grpc_completion_queue_next(
+ cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
+ ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
+ }
+
+ const char* address_; // Do not own.
+ grpc_server* server_ = nullptr;
+ grpc_completion_queue* cq_ = nullptr;
+ std::unique_ptr<std::thread> thread_;
+};
+
+// A TCP client that connects to the server, reads data until the server
+// closes, and then terminates.
+class Client {
+ public:
+ explicit Client(const char* server_address)
+ : server_address_(server_address) {}
+
+ void Connect() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolved_addresses* server_addresses = nullptr;
+ grpc_error* error =
+ grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
+ ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
+ ASSERT_GE(server_addresses->naddrs, 1UL);
+ pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(pollset_, &mu_);
+ grpc_pollset_set* pollset_set = grpc_pollset_set_create();
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_set, pollset_);
+ EventState state;
+ grpc_tcp_client_connect(&exec_ctx, state.closure(), &endpoint_, pollset_set,
+ nullptr /* channel_args */, server_addresses->addrs,
+ 1000);
+ ASSERT_TRUE(PollUntilDone(
+ &exec_ctx, &state,
+ grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
+ ASSERT_EQ(GRPC_ERROR_NONE, state.error());
+ grpc_pollset_set_destroy(&exec_ctx, pollset_set);
+ grpc_endpoint_add_to_pollset(&exec_ctx, endpoint_, pollset_);
+ grpc_resolved_addresses_destroy(server_addresses);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ // Reads until an error is returned.
+ // Returns true if an error was encountered before the deadline.
+ bool ReadUntilError() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_buffer read_buffer;
+ grpc_slice_buffer_init(&read_buffer);
+ bool retval = true;
+ // Use a deadline of 3 seconds, which is a lot more than we should
+ // need for a 1-second timeout, but this helps avoid flakes.
+ grpc_millis deadline = grpc_exec_ctx_now(&exec_ctx) + 3000;
+ while (true) {
+ EventState state;
+ grpc_endpoint_read(&exec_ctx, endpoint_, &read_buffer, state.closure());
+ if (!PollUntilDone(&exec_ctx, &state, deadline)) {
+ retval = false;
+ break;
+ }
+ if (state.error() != GRPC_ERROR_NONE) break;
+ gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
+ grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &read_buffer);
+ }
+ grpc_endpoint_shutdown(&exec_ctx, endpoint_,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &read_buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return retval;
+ }
+
+ void Shutdown() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_endpoint_destroy(&exec_ctx, endpoint_);
+ grpc_pollset_shutdown(&exec_ctx, pollset_,
+ GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
+ grpc_schedule_on_exec_ctx));
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ private:
+ // State used to wait for an I/O event.
+ class EventState {
+ public:
+ EventState() {
+ GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ ~EventState() { GRPC_ERROR_UNREF(error_); }
+
+ grpc_closure* closure() { return &closure_; }
+
+ bool done() const { return done_; }
+
+ // Caller does NOT take ownership of the error.
+ grpc_error* error() const { return error_; }
+
+ private:
+ static void OnEventDone(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
+ EventState* state = (EventState*)arg;
+ state->error_ = GRPC_ERROR_REF(error);
+ state->done_ = true;
+ }
+
+ grpc_closure closure_;
+ bool done_ = false;
+ grpc_error* error_ = GRPC_ERROR_NONE;
+ };
+
+ // Returns true if done, or false if deadline exceeded.
+ bool PollUntilDone(grpc_exec_ctx* exec_ctx, EventState* state,
+ grpc_millis deadline) {
+ while (true) {
+ grpc_pollset_worker* worker = nullptr;
+ gpr_mu_lock(mu_);
+ GRPC_LOG_IF_ERROR("grpc_pollset_work",
+ grpc_pollset_work(exec_ctx, pollset_, &worker,
+ grpc_exec_ctx_now(exec_ctx) + 1000));
+ gpr_mu_unlock(mu_);
+ if (state != nullptr && state->done()) return true;
+ if (grpc_exec_ctx_now(exec_ctx) >= deadline) return false;
+ }
+ }
+
+ static void PollsetDestroy(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_pollset* pollset = (grpc_pollset*)arg;
+ grpc_pollset_destroy(exec_ctx, pollset);
+ gpr_free(pollset);
+ }
+
+ const char* server_address_; // Do not own.
+ grpc_endpoint* endpoint_;
+ gpr_mu* mu_;
+ grpc_pollset* pollset_;
+};
+
+TEST(SettingsTimeout, Basic) {
+ // Construct server address string.
+ const int server_port = grpc_pick_unused_port_or_die();
+ char* server_address_string;
+ gpr_asprintf(&server_address_string, "localhost:%d", server_port);
+ // Start server.
+ gpr_log(GPR_INFO, "starting server on %s", server_address_string);
+ ServerThread server_thread(server_address_string);
+ server_thread.Start();
+ // Create client and connect to server.
+ gpr_log(GPR_INFO, "starting client connect");
+ Client client(server_address_string);
+ client.Connect();
+ // Client read. Should fail due to server dropping connection.
+ gpr_log(GPR_INFO, "starting client read");
+ EXPECT_TRUE(client.ReadUntilError());
+ // Shut down client.
+ gpr_log(GPR_INFO, "shutting down client");
+ client.Shutdown();
+ // Shut down server.
+ gpr_log(GPR_INFO, "shutting down server");
+ server_thread.Shutdown();
+ // Clean up.
+ gpr_free(server_address_string);
+}
+
+} // namespace
+} // namespace test
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_test_init(argc, argv);
+ grpc_init();
+ int result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index c15ab88da1..bbf3da4663 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -353,11 +353,6 @@ class GrpclbEnd2endTest : public ::testing::Test {
"balancer", server_host_, balancers_.back().get()));
}
ResetStub();
- std::vector<AddressData> addresses;
- for (size_t i = 0; i < balancer_servers_.size(); ++i) {
- addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
- }
- SetNextResolution(addresses);
}
void TearDown() override {
@@ -370,6 +365,14 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_fake_resolver_response_generator_unref(response_generator_);
}
+ void SetNextResolutionAllBalancers() {
+ std::vector<AddressData> addresses;
+ for (size_t i = 0; i < balancer_servers_.size(); ++i) {
+ addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
+ }
+ SetNextResolution(addresses);
+ }
+
void ResetStub(int fallback_timeout = 0) {
ChannelArguments args;
args.SetGrpclbFallbackTimeout(fallback_timeout);
@@ -581,6 +584,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest {
};
TEST_F(SingleBalancerTest, Vanilla) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -608,6 +612,7 @@ TEST_F(SingleBalancerTest, Vanilla) {
}
TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
+ SetNextResolutionAllBalancers();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor();
@@ -645,6 +650,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
}
TEST_F(SingleBalancerTest, Fallback) {
+ SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 2;
@@ -711,6 +717,7 @@ TEST_F(SingleBalancerTest, Fallback) {
}
TEST_F(SingleBalancerTest, FallbackUpdate) {
+ SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendInResolution = backends_.size() / 3;
@@ -818,6 +825,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
}
TEST_F(SingleBalancerTest, BackendsRestart) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -857,6 +865,7 @@ class UpdatesTest : public GrpclbEnd2endTest {
};
TEST_F(UpdatesTest, UpdateBalancers) {
+ SetNextResolutionAllBalancers();
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
@@ -919,6 +928,7 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// verify that the LB channel inside grpclb keeps the initial connection (which
// by definition is also present in the update).
TEST_F(UpdatesTest, UpdateBalancersRepeated) {
+ SetNextResolutionAllBalancers();
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[0]};
@@ -989,6 +999,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
}
TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ SetNextResolution(addresses);
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
@@ -1030,7 +1043,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
- std::vector<AddressData> addresses;
+ addresses.clear();
addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
SetNextResolution(addresses);
@@ -1055,8 +1068,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
balancers_[2]->NotifyDoneWithServerlists();
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
- EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
- EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ // The second balancer, published as part of the first update, may end up
+ // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
+ // firing races with the arrival of the update containing the second
+ // balancer.
+ EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
+ EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
+ EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
+ EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
// Check LB policy name for the channel.
@@ -1064,6 +1083,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
}
TEST_F(SingleBalancerTest, Drop) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
const int num_of_drop_by_rate_limiting_addresses = 1;
const int num_of_drop_by_load_balancing_addresses = 2;
@@ -1107,6 +1127,7 @@ TEST_F(SingleBalancerTest, Drop) {
}
TEST_F(SingleBalancerTest, DropAllFirst) {
+ SetNextResolutionAllBalancers();
// All registered addresses are marked as "drop".
const int num_of_drop_by_rate_limiting_addresses = 1;
const int num_of_drop_by_load_balancing_addresses = 1;
@@ -1122,6 +1143,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
}
TEST_F(SingleBalancerTest, DropAll) {
+ SetNextResolutionAllBalancers();
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
@@ -1152,6 +1174,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
};
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -1186,6 +1209,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
+ SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 3;
const int num_of_drop_by_rate_limiting_addresses = 2;
const int num_of_drop_by_load_balancing_addresses = 1;
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc
index a24cdc7d2d..30bd8bfef8 100644
--- a/test/cpp/interop/interop_server.cc
+++ b/test/cpp/interop/interop_server.cc
@@ -317,9 +317,15 @@ class TestServiceImpl : public TestService::Service {
void grpc::testing::interop::RunServer(
std::shared_ptr<ServerCredentials> creds) {
- GPR_ASSERT(FLAGS_port != 0);
+ RunServer(creds, FLAGS_port, nullptr);
+}
+
+void grpc::testing::interop::RunServer(
+ std::shared_ptr<ServerCredentials> creds, const int port,
+ ServerStartedCondition* server_started_condition) {
+ GPR_ASSERT(port != 0);
std::ostringstream server_address;
- server_address << "0.0.0.0:" << FLAGS_port;
+ server_address << "0.0.0.0:" << port;
TestServiceImpl service;
SimpleRequest request;
@@ -333,6 +339,14 @@ void grpc::testing::interop::RunServer(
}
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
+
+ // Signal that the server has started.
+ if (server_started_condition) {
+ std::unique_lock<std::mutex> lock(server_started_condition->mutex);
+ server_started_condition->server_started = true;
+ server_started_condition->condition.notify_all();
+ }
+
while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(5, GPR_TIMESPAN)));
diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h
index 6af003fe93..1bf7db1e14 100644
--- a/test/cpp/interop/server_helper.h
+++ b/test/cpp/interop/server_helper.h
@@ -19,6 +19,7 @@
#ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
#define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
+#include <condition_variable>
#include <memory>
#include <grpc/compression.h>
@@ -50,8 +51,27 @@ class InteropServerContextInspector {
namespace interop {
extern gpr_atm g_got_sigint;
+
+struct ServerStartedCondition {
+ std::mutex mutex;
+ std::condition_variable condition;
+ bool server_started = false;
+};
+
+/// Run gRPC interop server using port FLAGS_port.
+///
+/// \param creds The credentials associated with the server.
void RunServer(std::shared_ptr<ServerCredentials> creds);
+/// Run gRPC interop server.
+///
+/// \param creds The credentials associated with the server.
+/// \param port Port to use for the server.
+/// \param server_started_condition (optional) Struct holding mutex, condition
+/// variable, and condition used to notify when the server has started.
+void RunServer(std::shared_ptr<ServerCredentials> creds, int port,
+ ServerStartedCondition* server_started_condition);
+
} // namespace interop
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index b95ee0cc01..be4da4d0bd 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -137,7 +137,7 @@ class Fixture {
grpc_channel_args c_args = args.c_channel_args();
ep_ = new DummyEndpoint;
t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client);
- grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr);
+ grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr);
FlushExecCtx();
}
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index 7f1aa48b56..7e20843875 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -174,7 +174,7 @@ class EndpointPairFixture : public BaseFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
server_transport_ = grpc_create_chttp2_transport(
- &exec_ctx, server_args, endpoints.server, 0 /* is_client */);
+ &exec_ctx, server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -186,7 +186,7 @@ class EndpointPairFixture : public BaseFixture {
grpc_server_setup_transport(&exec_ctx, server_->c_server(),
server_transport_, nullptr, server_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_,
+ grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, nullptr,
nullptr);
}
@@ -197,13 +197,13 @@ class EndpointPairFixture : public BaseFixture {
fixture_configuration.ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
- client_transport_ =
- grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
+ client_transport_ = grpc_create_chttp2_transport(&exec_ctx, &c_args,
+ endpoints.client, true);
GPR_ASSERT(client_transport_);
grpc_channel* channel =
grpc_channel_create(&exec_ctx, "target", &c_args,
GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
- grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_,
+ grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, nullptr,
nullptr);
channel_ = CreateChannelInternal("", channel);
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 23fff2ea8b..1c6f44dd7d 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -89,7 +89,7 @@ class EndpointPairFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
grpc_transport* transport = grpc_create_chttp2_transport(
- &exec_ctx, server_args, endpoints.server, 0 /* is_client */);
+ &exec_ctx, server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -101,7 +101,8 @@ class EndpointPairFixture {
grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport,
nullptr, server_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
+ nullptr);
}
/* create channel */
@@ -111,12 +112,13 @@ class EndpointPairFixture {
ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
- grpc_transport* transport =
- grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
+ grpc_transport* transport = grpc_create_chttp2_transport(
+ &exec_ctx, &c_args, endpoints.client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
+ nullptr);
channel_ = CreateChannelInternal("", channel);
}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 9f20b148eb..82a3f0042d 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -60,21 +60,20 @@ class SynchronousClient
SetupLoadTest(config, num_threads_);
}
- virtual ~SynchronousClient(){};
+ virtual ~SynchronousClient() {}
- virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
void ThreadFunc(size_t thread_idx, Thread* t) override {
- InitThreadFuncImpl(thread_idx);
+ if (!InitThreadFuncImpl(thread_idx)) {
+ return;
+ }
for (;;) {
// run the loop body
HistogramEntry entry;
const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
t->UpdateHistogram(&entry);
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
if (!thread_still_ok || ThreadCompleted()) {
return;
}
@@ -109,9 +108,6 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
-
- private:
- void DestroyMultithreading() override final { EndThreads(); }
};
class SynchronousUnaryClient final : public SynchronousClient {
@@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFuncImpl(size_t thread_idx) override {}
+ bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
@@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
entry->set_status(s.error_code());
return true;
}
+
+ private:
+ void DestroyMultithreading() override final { EndThreads(); }
};
template <class StreamType>
@@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient {
: SynchronousClient(config),
context_(num_threads_),
stream_(num_threads_),
+ stream_mu_(num_threads_),
+ shutdown_(num_threads_),
messages_per_stream_(config.messages_per_stream()),
messages_issued_(num_threads_) {
StartThreads(num_threads_);
}
virtual ~SynchronousStreamingClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- // forcibly cancel the streams, then finish
- context_[i].TryCancel();
- (*stream)->Finish().IgnoreError();
- // don't log any error message on !ok since this was canceled
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams([this](size_t thread_idx) {
+ // Don't log any kind of error since we may have canceled this
+ stream_[thread_idx]->Finish().IgnoreError();
+ });
}
protected:
std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<StreamType>> stream_;
+ // stream_mu_ is only needed when changing an element of stream_ or context_
+ std::vector<std::mutex> stream_mu_;
+ // use struct Bool rather than bool because vector<bool> is not concurrent
+ struct Bool {
+ bool val;
+ Bool() : val(false) {}
+ };
+ std::vector<Bool> shutdown_;
const int messages_per_stream_;
std::vector<int> messages_issued_;
@@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient {
// don't set the value since the stream is failed and shouldn't be timed
entry->set_status(s.error_code());
if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
- s.error_message().c_str());
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
+ thread_idx, s.error_message().c_str());
+ }
}
+ // Lock the stream_mu_ now because the client context could change
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
context_[thread_idx].~ClientContext();
new (&context_[thread_idx]) ClientContext();
}
-};
-class SynchronousStreamingPingPongClient final
- : public SynchronousStreamingClient<
- grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
- public:
- SynchronousStreamingPingPongClient(const ClientConfig& config)
- : SynchronousStreamingClient(config) {}
- ~SynchronousStreamingPingPongClient() {
+ void CleanupAllStreams(std::function<void(size_t)> cleaner) {
std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
+ cleanup_threads.emplace_back([this, i, cleaner] {
+ std::lock_guard<std::mutex> l(stream_mu_[i]);
+ shutdown_[i].val = true;
+ if (stream_[i]) {
+ cleaner(i);
}
});
}
@@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ void DestroyMultithreading() override final {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
+ EndThreads();
+ }
+};
+
+class SynchronousStreamingPingPongClient final
+ : public SynchronousStreamingClient<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
+ public:
+ SynchronousStreamingPingPongClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config) {}
+ ~SynchronousStreamingPingPongClient() {
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
+ }
+
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ return false;
+ }
messages_issued_[thread_idx] = 0;
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
messages_issued_[thread_idx] = 0;
return true;
}
@@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final
SynchronousStreamingFromClientClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ std::vector<double> last_issue_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ return false;
+ }
last_issue_[thread_idx] = UsageTimer::Now();
+ return true;
}
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final
stream_[thread_idx]->WritesDone();
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
- &responses_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_issue_;
};
class SynchronousStreamingFromServerClient final
@@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ ~SynchronousStreamingFromServerClient() {}
+
+ private:
+ std::vector<double> last_recv_;
+
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ return false;
+ }
last_recv_[thread_idx] = UsageTimer::Now();
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
@@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final
}
FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] =
- stub->StreamingFromServer(&context_[thread_idx], request_);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ } else {
+ stream_[thread_idx].reset();
+ return false;
+ }
return true;
}
-
- private:
- std::vector<double> last_recv_;
};
class SynchronousStreamingBothWaysClient final
@@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final
SynchronousStreamingBothWaysClient(const ClientConfig& config)
: SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient() {
- std::vector<std::thread> cleanup_threads;
- for (size_t i = 0; i < num_threads_; i++) {
- cleanup_threads.emplace_back([this, i]() {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- }
- });
- }
- for (auto& th : cleanup_threads) {
- th.join();
- }
+ CleanupAllStreams(
+ [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
}
- void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+ bool InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+ if (!shutdown_[thread_idx].val) {
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ } else {
+ return false;
+ }
+ return true;
}
+
bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
diff --git a/test/distrib/cpp/run_distrib_test_cmake.bat b/test/distrib/cpp/run_distrib_test_cmake.bat
new file mode 100644
index 0000000000..047846b0f1
--- /dev/null
+++ b/test/distrib/cpp/run_distrib_test_cmake.bat
@@ -0,0 +1,75 @@
+@rem Copyright 2016 gRPC authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+@rem enter this directory
+cd /d %~dp0\..\..\..
+
+@rem TODO(jtattermusch): Kokoro has pre-installed protoc.exe in C:\Program Files\ProtoC and that directory
+@rem is on PATH. To avoid picking up the older version protoc.exe, we change the path to something non-existent.
+set PATH=%PATH:ProtoC=DontPickupProtoC%
+
+@rem Install into ./testinstall, but use absolute path and foward slashes
+set INSTALL_DIR=%cd:\=/%/testinstall
+
+@rem Download OpenSSL-Win32 originally installed from https://slproweb.com/products/Win32OpenSSL.html
+powershell -Command "(New-Object Net.WebClient).DownloadFile('https://storage.googleapis.com/grpc-testing.appspot.com/OpenSSL-Win32-1_1_0g.zip', 'OpenSSL-Win32.zip')"
+powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('OpenSSL-Win32.zip', '.');"
+
+@rem set absolute path to OpenSSL with forward slashes
+set OPENSSL_DIR=%cd:\=/%/OpenSSL-Win32
+
+cd third_party/zlib
+mkdir cmake
+cd cmake
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ..
+cmake --build . --config Release --target install || goto :error
+cd ../../..
+
+cd third_party/protobuf/cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -Dprotobuf_MSVC_STATIC_RUNTIME=OFF -Dprotobuf_BUILD_TESTS=OFF ..
+cmake --build . --config Release --target install || goto :error
+cd ../../../..
+
+cd third_party/cares/cares
+mkdir cmake
+cd cmake
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ..
+cmake --build . --config Release --target install || goto :error
+cd ../../../..
+
+@rem OpenSSL-Win32 and OpenSSL-Win64 can be downloaded from https://slproweb.com/products/Win32OpenSSL.html
+cd cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -DOPENSSL_ROOT_DIR=%OPENSSL_DIR% -DOPENSSL_INCLUDE_DIR=%OPENSSL_DIR%/include -DZLIB_LIBRARY=%INSTALL_DIR%/lib/zlibstatic.lib -DZLIB_INCLUDE_DIR=%INSTALL_DIR%/include -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release ../.. || goto :error
+cmake --build . --config Release --target install || goto :error
+cd ../..
+
+# Build helloworld example using cmake
+cd examples/cpp/helloworld
+mkdir cmake
+cd cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ../.. || goto :error
+cmake --build . --config Release || goto :error
+cd ../../../../..
+
+goto :EOF
+
+:error
+echo Failed!
+exit /b %errorlevel%
diff --git a/test/distrib/cpp/run_distrib_test_cmake.sh b/test/distrib/cpp/run_distrib_test_cmake.sh
index ead8cc10bc..a9c081c2f5 100755
--- a/test/distrib/cpp/run_distrib_test_cmake.sh
+++ b/test/distrib/cpp/run_distrib_test_cmake.sh
@@ -19,7 +19,6 @@ cd $(dirname $0)/../../..
echo "deb http://ftp.debian.org/debian jessie-backports main" | tee /etc/apt/sources.list.d/jessie-backports.list
apt-get update
-#apt-get install -t jessie-backports -y libc-ares-dev # we need specifically version 1.12
apt-get install -t jessie-backports -y libssl-dev
# Install c-ares