aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-05 17:49:11 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2017-12-05 17:49:11 -0800
commit73bb67d054ecb952f10649cc42c998ab7ea8facd (patch)
tree181ee31adc7a144d8dcffed5e5de990ab4662cef /test/core
parent65c45fbb4d576d99bcd1c00a13f430c8994fee38 (diff)
parent05cd3102b7b59bf5d71f66dc012be1f4ecdaad88 (diff)
Merge master into execctx
Diffstat (limited to 'test/core')
-rw-r--r--test/core/bad_client/bad_client.cc4
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.cc8
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.cc8
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc104
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/client_fuzzer.cc4
-rw-r--r--test/core/end2end/fuzzers/server_fuzzer.cc4
-rw-r--r--test/core/iomgr/udp_server_test.cc16
-rw-r--r--test/core/security/ssl_server_fuzzer.cc5
-rw-r--r--test/core/transport/chttp2/BUILD15
-rw-r--r--test/core/transport/chttp2/settings_timeout_test.cc253
12 files changed, 372 insertions, 61 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index f63a32ea5b..ccf35444d0 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -114,9 +114,9 @@ void grpc_run_bad_client_test(
GRPC_BAD_CLIENT_REGISTERED_HOST,
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
grpc_server_start(a.server);
- transport = grpc_create_chttp2_transport(nullptr, sfd.server, 0);
+ transport = grpc_create_chttp2_transport(nullptr, sfd.server, false);
server_setup_transport(&a, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
/* Bind everything into the same pollset */
grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq));
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc
index 447bb91d0c..9807e929af 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.cc
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc
@@ -93,10 +93,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc
index c1f8ee0b64..b68279fd71 100644
--- a/test/core/end2end/fixtures/h2_sockpair.cc
+++ b/test/core/end2end/fixtures/h2_sockpair.cc
@@ -87,10 +87,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -102,9 +102,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
index 55b847bdae..350be138ca 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
@@ -98,10 +98,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@@ -113,9 +113,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
f->server = grpc_server_create(server_args, nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index 539bbe4134..15c9dd4847 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy {
// Connection handling
//
+// proxy_connection structure is only accessed in the closures which are all
+// scheduled under the same combiner lock. So there is is no need for a mutex to
+// protect this structure.
typedef struct proxy_connection {
grpc_end2end_http_proxy* proxy;
@@ -78,6 +81,8 @@ typedef struct proxy_connection {
grpc_pollset_set* pollset_set;
+ // NOTE: All the closures execute under proxy->combiner lock. Which means
+ // there will not be any data-races between the closures
grpc_closure on_read_request_done;
grpc_closure on_server_connect_done;
grpc_closure on_write_response_done;
@@ -86,6 +91,13 @@ typedef struct proxy_connection {
grpc_closure on_server_read_done;
grpc_closure on_server_write_done;
+ bool client_read_failed : 1;
+ bool client_write_failed : 1;
+ bool client_shutdown : 1;
+ bool server_read_failed : 1;
+ bool server_write_failed : 1;
+ bool server_shutdown : 1;
+
grpc_slice_buffer client_read_buffer;
grpc_slice_buffer client_deferred_write_buffer;
bool client_is_writing;
@@ -126,18 +138,50 @@ static void proxy_connection_unref(proxy_connection* conn, const char* reason) {
}
}
+enum failure_type {
+ SETUP_FAILED, // To be used before we start proxying.
+ CLIENT_READ_FAILED,
+ CLIENT_WRITE_FAILED,
+ SERVER_READ_FAILED,
+ SERVER_WRITE_FAILED,
+};
+
// Helper function to shut down the proxy connection.
-// Does NOT take ownership of a reference to error.
-static void proxy_connection_failed(proxy_connection* conn, bool is_client,
- const char* prefix, grpc_error* error) {
- const char* msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "%s: %s", prefix, msg);
-
- grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error));
- if (conn->server_endpoint != nullptr) {
+static void proxy_connection_failed(proxy_connection* conn,
+ failure_type failure, const char* prefix,
+ grpc_error* error) {
+ gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
+ // Decide whether we should shut down the client and server.
+ bool shutdown_client = false;
+ bool shutdown_server = false;
+ if (failure == SETUP_FAILED) {
+ shutdown_client = true;
+ shutdown_server = true;
+ } else {
+ if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
+ (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
+ (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
+ shutdown_client = true;
+ }
+ if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
+ (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
+ (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
+ shutdown_server = true;
+ }
+ }
+ // If we decided to shut down either one and have not yet done so, do so.
+ if (shutdown_client && !conn->client_shutdown) {
+ grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error));
+ conn->client_shutdown = true;
+ }
+ if (shutdown_server && !conn->server_shutdown &&
+ (conn->server_endpoint != nullptr)) {
grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error));
+ conn->server_shutdown = true;
}
+ // Unref the connection.
proxy_connection_unref(conn, "conn_failed");
+ GRPC_ERROR_UNREF(error);
}
// Callback for writing proxy data to the client.
@@ -145,8 +189,8 @@ static void on_client_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy client write", error);
+ proxy_connection_failed(conn, CLIENT_WRITE_FAILED,
+ "HTTP proxy client write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -170,8 +214,8 @@ static void on_server_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, false /* is_client */,
- "HTTP proxy server write", error);
+ proxy_connection_failed(conn, SERVER_WRITE_FAILED,
+ "HTTP proxy server write", GRPC_ERROR_REF(error));
return;
}
// Clear write buffer (the data we just wrote).
@@ -195,8 +239,8 @@ static void on_server_write_done(void* arg, grpc_error* error) {
static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy client read", error);
+ proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read",
+ GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., server_write_buffer is
@@ -226,8 +270,8 @@ static void on_client_read_done(void* arg, grpc_error* error) {
static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, false /* is_client */,
- "HTTP proxy server read", error);
+ proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read",
+ GRPC_ERROR_REF(error));
return;
}
// If there is already a pending write (i.e., client_write_buffer is
@@ -257,8 +301,8 @@ static void on_write_response_done(void* arg, grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy write response", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response",
+ GRPC_ERROR_REF(error));
return;
}
// Clear write buffer.
@@ -285,8 +329,8 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
// connection failed. However, for the purposes of this test code,
// it's fine to pretend this is a client-side error, which will
// cause the client connection to be dropped.
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy server connect", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect",
+ GRPC_ERROR_REF(error));
return;
}
// We've established a connection, so send back a 200 response code to
@@ -331,8 +375,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
grpc_error_string(error));
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
return;
}
// Read request and feed it to the parser.
@@ -341,8 +385,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
error = grpc_http_parser_parse(
&conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy request parse", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -362,8 +406,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
conn->http_request.method);
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -382,8 +426,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
if (!client_authenticated) {
const char* msg = "HTTP Connect could not verify authentication";
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
- proxy_connection_failed(conn, true /* is_client */,
- "HTTP proxy read request", error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
@@ -393,8 +437,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
error = grpc_blocking_resolve_address(conn->http_request.path, "80",
&resolved_addresses);
if (error != GRPC_ERROR_NONE) {
- proxy_connection_failed(conn, true /* is_client */, "HTTP proxy DNS lookup",
- error);
+ proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup",
+ GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
return;
}
diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc
index 76e716281a..967a6d560f 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.cc
+++ b/test/core/end2end/fuzzers/api_fuzzer.cc
@@ -468,9 +468,9 @@ static void do_connect(void* arg, grpc_error* error) {
*fc->ep = client;
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, server, 0);
+ grpc_create_chttp2_transport(nullptr, server, false);
grpc_server_setup_transport(g_server, transport, nullptr, nullptr);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE);
} else {
diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc
index 2d3fc7a074..c17d581d8b 100644
--- a/test/core/end2end/fuzzers/client_fuzzer.cc
+++ b/test/core/end2end/fuzzers/client_fuzzer.cc
@@ -55,8 +55,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, mock_endpoint, 1);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_create_chttp2_transport(nullptr, mock_endpoint, true);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_channel* channel = grpc_channel_create(
"test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport);
diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc
index db25160e2b..61c55e0afd 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.cc
+++ b/test/core/end2end/fuzzers/server_fuzzer.cc
@@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// grpc_server_register_method(server, "/reg", NULL, 0);
grpc_server_start(server);
grpc_transport* transport =
- grpc_create_chttp2_transport(nullptr, mock_endpoint, 0);
+ grpc_create_chttp2_transport(nullptr, mock_endpoint, false);
grpc_server_setup_transport(server, transport, nullptr, nullptr);
- grpc_chttp2_transport_start_reading(transport, nullptr);
+ grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_call* call1 = nullptr;
grpc_call_details call_details1;
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 7f662f0683..0deb534abd 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@ static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
-static void on_read(grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_fd* emfd, void* user_data) {
char read_buffer[512];
ssize_t byte_count;
@@ -64,9 +64,11 @@ static void on_read(grpc_fd* emfd, void* user_data) {
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
+ return false;
}
-static void on_write(grpc_fd* emfd, void* user_data) {
+static void on_write(grpc_fd* emfd, void* user_data,
+ grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
@@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure,
void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
@@ -222,7 +225,6 @@ static void test_receive(int number_of_clients) {
int clifd, svrfd;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
int i;
- int number_of_reads_before;
grpc_millis deadline;
grpc_pollset* pollsets[1];
LOG_TEST("test_receive");
@@ -252,14 +254,14 @@ static void test_receive(int number_of_clients) {
deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
- number_of_reads_before = g_number_of_reads;
+ int number_of_bytes_read_before = g_number_of_bytes_read;
/* Create a socket, send a packet to the UDP server. */
clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
GPR_ASSERT(clifd >= 0);
GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
(socklen_t)resolved_addr.len) == 0);
GPR_ASSERT(5 == write(clifd, "hello", 5));
- while (g_number_of_reads == number_of_reads_before &&
+ while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
deadline > grpc_core::ExecCtx::Get()->Now()) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -268,7 +270,6 @@ static void test_receive(int number_of_clients) {
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(g_mu);
}
- GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd);
}
GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@@ -280,9 +281,6 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
-
- /* The write callback should have fired a few times. */
- GPR_ASSERT(g_number_of_writes > 0);
}
static void destroy_pollset(void* p, grpc_error* error) {
diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc
index 7056cfe581..6e30698562 100644
--- a/test/core/security/ssl_server_fuzzer.cc
+++ b/test/core/security/ssl_server_fuzzer.cc
@@ -93,8 +93,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_server_security_connector_add_handshakers(sc, handshake_mgr);
grpc_handshake_manager_do_handshake(
- handshake_mgr, mock_endpoint, nullptr /* channel_args */, deadline,
- nullptr /* acceptor */, on_handshake_done, &state);
+ handshake_mgr, nullptr /* interested_parties */, mock_endpoint,
+ nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+ on_handshake_done, &state);
grpc_core::ExecCtx::Get()->Flush();
// If the given string happens to be part of the correct client hello, the
diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD
index 1ea7d0341d..6eff716b01 100644
--- a/test/core/transport/chttp2/BUILD
+++ b/test/core/transport/chttp2/BUILD
@@ -115,6 +115,21 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "settings_timeout_test",
+ srcs = ["settings_timeout_test.cc"],
+ language = "C++",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+ external_deps = [
+ "gtest",
+ ],
+)
+
+grpc_cc_test(
name = "varint_test",
srcs = ["varint_test.cc"],
language = "C++",
diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc
new file mode 100644
index 0000000000..08473c72b6
--- /dev/null
+++ b/test/core/transport/chttp2/settings_timeout_test.cc
@@ -0,0 +1,253 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include <memory>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace test {
+namespace {
+
+// A gRPC server, running in its own thread.
+class ServerThread {
+ public:
+ explicit ServerThread(const char* address) : address_(address) {}
+
+ void Start() {
+ // Start server with 1-second handshake timeout.
+ grpc_arg arg;
+ arg.type = GRPC_ARG_INTEGER;
+ arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+ arg.value.integer = 1000;
+ grpc_channel_args args = {1, &arg};
+ server_ = grpc_server_create(&args, nullptr);
+ ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
+ cq_ = grpc_completion_queue_create_for_next(nullptr);
+ grpc_server_register_completion_queue(server_, cq_, nullptr);
+ grpc_server_start(server_);
+ thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
+ }
+
+ void Shutdown() {
+ grpc_completion_queue* shutdown_cq =
+ grpc_completion_queue_create_for_pluck(nullptr);
+ grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
+ GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
+ grpc_timeout_seconds_to_deadline(1),
+ nullptr)
+ .type == GRPC_OP_COMPLETE);
+ grpc_completion_queue_destroy(shutdown_cq);
+ grpc_server_destroy(server_);
+ grpc_completion_queue_destroy(cq_);
+ thread_->join();
+ }
+
+ private:
+ void Serve() {
+ // The completion queue should not return anything other than shutdown.
+ grpc_event ev = grpc_completion_queue_next(
+ cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
+ ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
+ }
+
+ const char* address_; // Do not own.
+ grpc_server* server_ = nullptr;
+ grpc_completion_queue* cq_ = nullptr;
+ std::unique_ptr<std::thread> thread_;
+};
+
+// A TCP client that connects to the server, reads data until the server
+// closes, and then terminates.
+class Client {
+ public:
+ explicit Client(const char* server_address)
+ : server_address_(server_address) {}
+
+ void Connect() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_resolved_addresses* server_addresses = nullptr;
+ grpc_error* error =
+ grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
+ ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
+ ASSERT_GE(server_addresses->naddrs, 1UL);
+ pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(pollset_, &mu_);
+ grpc_pollset_set* pollset_set = grpc_pollset_set_create();
+ grpc_pollset_set_add_pollset(pollset_set, pollset_);
+ EventState state;
+ grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set,
+ nullptr /* channel_args */, server_addresses->addrs,
+ 1000);
+ ASSERT_TRUE(PollUntilDone(
+ &state,
+ grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
+ ASSERT_EQ(GRPC_ERROR_NONE, state.error());
+ grpc_pollset_set_destroy(pollset_set);
+ grpc_endpoint_add_to_pollset(endpoint_, pollset_);
+ grpc_resolved_addresses_destroy(server_addresses);
+ }
+
+ // Reads until an error is returned.
+ // Returns true if an error was encountered before the deadline.
+ bool ReadUntilError() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_slice_buffer read_buffer;
+ grpc_slice_buffer_init(&read_buffer);
+ bool retval = true;
+ // Use a deadline of 3 seconds, which is a lot more than we should
+ // need for a 1-second timeout, but this helps avoid flakes.
+ grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
+ while (true) {
+ EventState state;
+ grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
+ if (!PollUntilDone(&state, deadline)) {
+ retval = false;
+ break;
+ }
+ if (state.error() != GRPC_ERROR_NONE) break;
+ gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
+ grpc_slice_buffer_reset_and_unref_internal(&read_buffer);
+ }
+ grpc_endpoint_shutdown(endpoint_,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
+ grpc_slice_buffer_destroy_internal(&read_buffer);
+ return retval;
+ }
+
+ void Shutdown() {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_endpoint_destroy(endpoint_);
+ grpc_pollset_shutdown(pollset_,
+ GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
+ grpc_schedule_on_exec_ctx));
+ }
+
+ private:
+ // State used to wait for an I/O event.
+ class EventState {
+ public:
+ EventState() {
+ GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ ~EventState() { GRPC_ERROR_UNREF(error_); }
+
+ grpc_closure* closure() { return &closure_; }
+
+ bool done() const { return done_; }
+
+ // Caller does NOT take ownership of the error.
+ grpc_error* error() const { return error_; }
+
+ private:
+ static void OnEventDone(void* arg, grpc_error* error) {
+ gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
+ EventState* state = (EventState*)arg;
+ state->error_ = GRPC_ERROR_REF(error);
+ state->done_ = true;
+ }
+
+ grpc_closure closure_;
+ bool done_ = false;
+ grpc_error* error_ = GRPC_ERROR_NONE;
+ };
+
+ // Returns true if done, or false if deadline exceeded.
+ bool PollUntilDone(EventState* state, grpc_millis deadline) {
+ while (true) {
+ grpc_pollset_worker* worker = nullptr;
+ gpr_mu_lock(mu_);
+ GRPC_LOG_IF_ERROR(
+ "grpc_pollset_work",
+ grpc_pollset_work(pollset_, &worker,
+ grpc_core::ExecCtx::Get()->Now() + 1000));
+ gpr_mu_unlock(mu_);
+ if (state != nullptr && state->done()) return true;
+ if (grpc_core::ExecCtx::Get()->Now() >= deadline) return false;
+ }
+ }
+
+ static void PollsetDestroy(void* arg, grpc_error* error) {
+ grpc_pollset* pollset = (grpc_pollset*)arg;
+ grpc_pollset_destroy(pollset);
+ gpr_free(pollset);
+ }
+
+ const char* server_address_; // Do not own.
+ grpc_endpoint* endpoint_;
+ gpr_mu* mu_;
+ grpc_pollset* pollset_;
+};
+
+TEST(SettingsTimeout, Basic) {
+ // Construct server address string.
+ const int server_port = grpc_pick_unused_port_or_die();
+ char* server_address_string;
+ gpr_asprintf(&server_address_string, "localhost:%d", server_port);
+ // Start server.
+ gpr_log(GPR_INFO, "starting server on %s", server_address_string);
+ ServerThread server_thread(server_address_string);
+ server_thread.Start();
+ // Create client and connect to server.
+ gpr_log(GPR_INFO, "starting client connect");
+ Client client(server_address_string);
+ client.Connect();
+ // Client read. Should fail due to server dropping connection.
+ gpr_log(GPR_INFO, "starting client read");
+ EXPECT_TRUE(client.ReadUntilError());
+ // Shut down client.
+ gpr_log(GPR_INFO, "shutting down client");
+ client.Shutdown();
+ // Shut down server.
+ gpr_log(GPR_INFO, "shutting down server");
+ server_thread.Shutdown();
+ // Clean up.
+ gpr_free(server_address_string);
+}
+
+} // namespace
+} // namespace test
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_test_init(argc, argv);
+ grpc_init();
+ int result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}