diff options
Diffstat (limited to 'test')
21 files changed, 664 insertions, 168 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index 443886751c..d8bb092e14 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -115,9 +115,10 @@ void grpc_run_bad_client_test( GRPC_BAD_CLIENT_REGISTERED_HOST, GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0); grpc_server_start(a.server); - transport = grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, 0); + transport = + grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, false); server_setup_transport(&a, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); /* Bind everything into the same pollset */ diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc index fe4ab74843..9319c401dc 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.cc +++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc @@ -97,10 +97,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, cs.client_args = client_args; cs.f = f; transport = - grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1); + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -114,9 +114,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); transport = - grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0); + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc index 59a3a6db31..03566aada2 100644 --- a/test/core/end2end/fixtures/h2_sockpair.cc +++ b/test/core/end2end/fixtures/h2_sockpair.cc @@ -91,10 +91,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, cs.client_args = client_args; cs.f = f; transport = - grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1); + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); transport = - grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0); + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc index d935efbbac..9adba00204 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc @@ -102,10 +102,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f, cs.client_args = client_args; cs.f = f; transport = - grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1); + grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true); client_setup_transport(&exec_ctx, &cs, transport); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } @@ -119,9 +119,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f, grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_start(f->server); transport = - grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0); + grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false); server_setup_transport(f, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index 3f7806942b..3904887026 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -68,6 +68,9 @@ struct grpc_end2end_http_proxy { // Connection handling // +// proxy_connection structure is only accessed in the closures which are all +// scheduled under the same combiner lock. So there is is no need for a mutex to +// protect this structure. typedef struct proxy_connection { grpc_end2end_http_proxy* proxy; @@ -78,6 +81,8 @@ typedef struct proxy_connection { grpc_pollset_set* pollset_set; + // NOTE: All the closures execute under proxy->combiner lock. Which means + // there will not be any data-races between the closures grpc_closure on_read_request_done; grpc_closure on_server_connect_done; grpc_closure on_write_response_done; @@ -86,6 +91,13 @@ typedef struct proxy_connection { grpc_closure on_server_read_done; grpc_closure on_server_write_done; + bool client_read_failed : 1; + bool client_write_failed : 1; + bool client_shutdown : 1; + bool server_read_failed : 1; + bool server_write_failed : 1; + bool server_shutdown : 1; + grpc_slice_buffer client_read_buffer; grpc_slice_buffer client_deferred_write_buffer; bool client_is_writing; @@ -129,21 +141,53 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, } } +enum failure_type { + SETUP_FAILED, // To be used before we start proxying. + CLIENT_READ_FAILED, + CLIENT_WRITE_FAILED, + SERVER_READ_FAILED, + SERVER_WRITE_FAILED, +}; + // Helper function to shut down the proxy connection. -// Does NOT take ownership of a reference to error. static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, - proxy_connection* conn, bool is_client, - const char* prefix, grpc_error* error) { - const char* msg = grpc_error_string(error); - gpr_log(GPR_INFO, "%s: %s", prefix, msg); - - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, - GRPC_ERROR_REF(error)); - if (conn->server_endpoint != nullptr) { + proxy_connection* conn, + failure_type failure, const char* prefix, + grpc_error* error) { + gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error)); + // Decide whether we should shut down the client and server. + bool shutdown_client = false; + bool shutdown_server = false; + if (failure == SETUP_FAILED) { + shutdown_client = true; + shutdown_server = true; + } else { + if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) || + (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) || + (failure == SERVER_READ_FAILED && !conn->client_is_writing)) { + shutdown_client = true; + } + if ((failure == SERVER_READ_FAILED && conn->server_write_failed) || + (failure == SERVER_WRITE_FAILED && conn->server_read_failed) || + (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) { + shutdown_server = true; + } + } + // If we decided to shut down either one and have not yet done so, do so. + if (shutdown_client && !conn->client_shutdown) { + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, + GRPC_ERROR_REF(error)); + conn->client_shutdown = true; + } + if (shutdown_server && !conn->server_shutdown && + (conn->server_endpoint != nullptr)) { grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, GRPC_ERROR_REF(error)); + conn->server_shutdown = true; } + // Unref the connection. proxy_connection_unref(exec_ctx, conn, "conn_failed"); + GRPC_ERROR_UNREF(error); } // Callback for writing proxy data to the client. @@ -152,8 +196,8 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy client write", error); + proxy_connection_failed(exec_ctx, conn, CLIENT_WRITE_FAILED, + "HTTP proxy client write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -179,8 +223,8 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, false /* is_client */, - "HTTP proxy server write", error); + proxy_connection_failed(exec_ctx, conn, SERVER_WRITE_FAILED, + "HTTP proxy server write", GRPC_ERROR_REF(error)); return; } // Clear write buffer (the data we just wrote). @@ -206,8 +250,8 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy client read", error); + proxy_connection_failed(exec_ctx, conn, CLIENT_READ_FAILED, + "HTTP proxy client read", GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., server_write_buffer is @@ -239,8 +283,8 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, false /* is_client */, - "HTTP proxy server read", error); + proxy_connection_failed(exec_ctx, conn, SERVER_READ_FAILED, + "HTTP proxy server read", GRPC_ERROR_REF(error)); return; } // If there is already a pending write (i.e., client_write_buffer is @@ -272,8 +316,8 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy write response", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy write response", GRPC_ERROR_REF(error)); return; } // Clear write buffer. @@ -301,8 +345,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, // connection failed. However, for the purposes of this test code, // it's fine to pretend this is a client-side error, which will // cause the client connection to be dropped. - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy server connect", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy server connect", GRPC_ERROR_REF(error)); return; } // We've established a connection, so send back a 200 response code to @@ -351,8 +395,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); return; } // Read request and feed it to the parser. @@ -361,8 +405,9 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, error = grpc_http_parser_parse( &conn->http_parser, conn->client_read_buffer.slices[i], nullptr); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy request parse", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy request parse", + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -382,8 +427,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, conn->http_request.method); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -403,8 +448,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, if (!client_authenticated) { const char* msg = "HTTP Connect could not verify authentication"; error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy read request", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } @@ -414,8 +459,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy DNS lookup", error); + proxy_connection_failed(exec_ctx, conn, SETUP_FAILED, + "HTTP proxy DNS lookup", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); return; } diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 0b49b89205..75117218e4 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -468,10 +468,10 @@ static void do_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { *fc->ep = client; grpc_transport* transport = - grpc_create_chttp2_transport(exec_ctx, nullptr, server, 0); + grpc_create_chttp2_transport(exec_ctx, nullptr, server, false); grpc_server_setup_transport(exec_ctx, g_server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr); GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE); } else { diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index ebc8c2780d..5871f0f43e 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -54,8 +54,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_transport* transport = - grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 1); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, true); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_channel* channel = grpc_channel_create( &exec_ctx, "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index fb6477b579..67caf4e720 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { // grpc_server_register_method(server, "/reg", NULL, 0); grpc_server_start(server); grpc_transport* transport = - grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 0); + grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, false); grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, nullptr); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_call* call1 = nullptr; grpc_call_details call_details1; diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 803f017106..6e17be9cd6 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -50,7 +50,7 @@ static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; -static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { +static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { char read_buffer[512]; ssize_t byte_count; @@ -64,9 +64,11 @@ static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); + return false; } -static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { +static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data, + grpc_closure* notify_on_write_closure) { gpr_mu_lock(g_mu); g_number_of_writes++; @@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, grpc_closure* closure, void* user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); g_number_of_orphan_calls++; } @@ -226,7 +229,6 @@ static void test_receive(int number_of_clients) { int clifd, svrfd; grpc_udp_server* s = grpc_udp_server_create(nullptr); int i; - int number_of_reads_before; grpc_millis deadline; grpc_pollset* pollsets[1]; LOG_TEST("test_receive"); @@ -256,14 +258,14 @@ static void test_receive(int number_of_clients) { deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); - number_of_reads_before = g_number_of_reads; + int number_of_bytes_read_before = g_number_of_bytes_read; /* Create a socket, send a packet to the UDP server. */ clifd = socket(addr->ss_family, SOCK_DGRAM, 0); GPR_ASSERT(clifd >= 0); GPR_ASSERT(connect(clifd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len) == 0); GPR_ASSERT(5 == write(clifd, "hello", 5)); - while (g_number_of_reads == number_of_reads_before && + while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) && deadline > grpc_exec_ctx_now(&exec_ctx)) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( @@ -273,7 +275,6 @@ static void test_receive(int number_of_clients) { grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } - GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); } GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients); @@ -286,9 +287,6 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); - - /* The write callback should have fired a few times. */ - GPR_ASSERT(g_number_of_writes > 0); } static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc index bbb2f6013e..d83ebb18d2 100644 --- a/test/core/security/ssl_server_fuzzer.cc +++ b/test/core/security/ssl_server_fuzzer.cc @@ -92,8 +92,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); grpc_server_security_connector_add_handshakers(&exec_ctx, sc, handshake_mgr); grpc_handshake_manager_do_handshake( - &exec_ctx, handshake_mgr, mock_endpoint, nullptr /* channel_args */, - deadline, nullptr /* acceptor */, on_handshake_done, &state); + &exec_ctx, handshake_mgr, nullptr /* interested_parties */, mock_endpoint, + nullptr /* channel_args */, deadline, nullptr /* acceptor */, + on_handshake_done, &state); grpc_exec_ctx_flush(&exec_ctx); // If the given string happens to be part of the correct client hello, the diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 1ea7d0341d..6eff716b01 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -115,6 +115,21 @@ grpc_cc_test( ) grpc_cc_test( + name = "settings_timeout_test", + srcs = ["settings_timeout_test.cc"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], + external_deps = [ + "gtest", + ], +) + +grpc_cc_test( name = "varint_test", srcs = ["varint_test.cc"], language = "C++", diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc new file mode 100644 index 0000000000..670eae1f79 --- /dev/null +++ b/test/core/transport/chttp2/settings_timeout_test.cc @@ -0,0 +1,258 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include <memory> +#include <thread> + +#include <gtest/gtest.h> + +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/slice/slice_internal.h" + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace test { +namespace { + +// A gRPC server, running in its own thread. +class ServerThread { + public: + explicit ServerThread(const char* address) : address_(address) {} + + void Start() { + // Start server with 1-second handshake timeout. + grpc_arg arg; + arg.type = GRPC_ARG_INTEGER; + arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); + arg.value.integer = 1000; + grpc_channel_args args = {1, &arg}; + server_ = grpc_server_create(&args, nullptr); + ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_)); + cq_ = grpc_completion_queue_create_for_next(nullptr); + grpc_server_register_completion_queue(server_, cq_, nullptr); + grpc_server_start(server_); + thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this))); + } + + void Shutdown() { + grpc_completion_queue* shutdown_cq = + grpc_completion_queue_create_for_pluck(nullptr); + grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr, + grpc_timeout_seconds_to_deadline(1), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); + grpc_server_destroy(server_); + grpc_completion_queue_destroy(cq_); + thread_->join(); + } + + private: + void Serve() { + // The completion queue should not return anything other than shutdown. + grpc_event ev = grpc_completion_queue_next( + cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr); + ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type); + } + + const char* address_; // Do not own. + grpc_server* server_ = nullptr; + grpc_completion_queue* cq_ = nullptr; + std::unique_ptr<std::thread> thread_; +}; + +// A TCP client that connects to the server, reads data until the server +// closes, and then terminates. +class Client { + public: + explicit Client(const char* server_address) + : server_address_(server_address) {} + + void Connect() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_addresses* server_addresses = nullptr; + grpc_error* error = + grpc_blocking_resolve_address(server_address_, "80", &server_addresses); + ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error); + ASSERT_GE(server_addresses->naddrs, 1UL); + pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(pollset_, &mu_); + grpc_pollset_set* pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_set, pollset_); + EventState state; + grpc_tcp_client_connect(&exec_ctx, state.closure(), &endpoint_, pollset_set, + nullptr /* channel_args */, server_addresses->addrs, + 1000); + ASSERT_TRUE(PollUntilDone( + &exec_ctx, &state, + grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC)))); + ASSERT_EQ(GRPC_ERROR_NONE, state.error()); + grpc_pollset_set_destroy(&exec_ctx, pollset_set); + grpc_endpoint_add_to_pollset(&exec_ctx, endpoint_, pollset_); + grpc_resolved_addresses_destroy(server_addresses); + grpc_exec_ctx_finish(&exec_ctx); + } + + // Reads until an error is returned. + // Returns true if an error was encountered before the deadline. + bool ReadUntilError() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_buffer read_buffer; + grpc_slice_buffer_init(&read_buffer); + bool retval = true; + // Use a deadline of 3 seconds, which is a lot more than we should + // need for a 1-second timeout, but this helps avoid flakes. + grpc_millis deadline = grpc_exec_ctx_now(&exec_ctx) + 3000; + while (true) { + EventState state; + grpc_endpoint_read(&exec_ctx, endpoint_, &read_buffer, state.closure()); + if (!PollUntilDone(&exec_ctx, &state, deadline)) { + retval = false; + break; + } + if (state.error() != GRPC_ERROR_NONE) break; + gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length); + grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &read_buffer); + } + grpc_endpoint_shutdown(&exec_ctx, endpoint_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown")); + grpc_slice_buffer_destroy_internal(&exec_ctx, &read_buffer); + grpc_exec_ctx_finish(&exec_ctx); + return retval; + } + + void Shutdown() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_destroy(&exec_ctx, endpoint_); + grpc_pollset_shutdown(&exec_ctx, pollset_, + GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_, + grpc_schedule_on_exec_ctx)); + grpc_exec_ctx_finish(&exec_ctx); + } + + private: + // State used to wait for an I/O event. + class EventState { + public: + EventState() { + GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this, + grpc_schedule_on_exec_ctx); + } + + ~EventState() { GRPC_ERROR_UNREF(error_); } + + grpc_closure* closure() { return &closure_; } + + bool done() const { return done_; } + + // Caller does NOT take ownership of the error. + grpc_error* error() const { return error_; } + + private: + static void OnEventDone(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error)); + EventState* state = (EventState*)arg; + state->error_ = GRPC_ERROR_REF(error); + state->done_ = true; + } + + grpc_closure closure_; + bool done_ = false; + grpc_error* error_ = GRPC_ERROR_NONE; + }; + + // Returns true if done, or false if deadline exceeded. + bool PollUntilDone(grpc_exec_ctx* exec_ctx, EventState* state, + grpc_millis deadline) { + while (true) { + grpc_pollset_worker* worker = nullptr; + gpr_mu_lock(mu_); + GRPC_LOG_IF_ERROR("grpc_pollset_work", + grpc_pollset_work(exec_ctx, pollset_, &worker, + grpc_exec_ctx_now(exec_ctx) + 1000)); + gpr_mu_unlock(mu_); + if (state != nullptr && state->done()) return true; + if (grpc_exec_ctx_now(exec_ctx) >= deadline) return false; + } + } + + static void PollsetDestroy(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_pollset* pollset = (grpc_pollset*)arg; + grpc_pollset_destroy(exec_ctx, pollset); + gpr_free(pollset); + } + + const char* server_address_; // Do not own. + grpc_endpoint* endpoint_; + gpr_mu* mu_; + grpc_pollset* pollset_; +}; + +TEST(SettingsTimeout, Basic) { + // Construct server address string. + const int server_port = grpc_pick_unused_port_or_die(); + char* server_address_string; + gpr_asprintf(&server_address_string, "localhost:%d", server_port); + // Start server. + gpr_log(GPR_INFO, "starting server on %s", server_address_string); + ServerThread server_thread(server_address_string); + server_thread.Start(); + // Create client and connect to server. + gpr_log(GPR_INFO, "starting client connect"); + Client client(server_address_string); + client.Connect(); + // Client read. Should fail due to server dropping connection. + gpr_log(GPR_INFO, "starting client read"); + EXPECT_TRUE(client.ReadUntilError()); + // Shut down client. + gpr_log(GPR_INFO, "shutting down client"); + client.Shutdown(); + // Shut down server. + gpr_log(GPR_INFO, "shutting down server"); + server_thread.Shutdown(); + // Clean up. + gpr_free(server_address_string); +} + +} // namespace +} // namespace test +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + grpc_init(); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index c15ab88da1..bbf3da4663 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -353,11 +353,6 @@ class GrpclbEnd2endTest : public ::testing::Test { "balancer", server_host_, balancers_.back().get())); } ResetStub(); - std::vector<AddressData> addresses; - for (size_t i = 0; i < balancer_servers_.size(); ++i) { - addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""}); - } - SetNextResolution(addresses); } void TearDown() override { @@ -370,6 +365,14 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_fake_resolver_response_generator_unref(response_generator_); } + void SetNextResolutionAllBalancers() { + std::vector<AddressData> addresses; + for (size_t i = 0; i < balancer_servers_.size(); ++i) { + addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""}); + } + SetNextResolution(addresses); + } + void ResetStub(int fallback_timeout = 0) { ChannelArguments args; args.SetGrpclbFallbackTimeout(fallback_timeout); @@ -581,6 +584,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest { }; TEST_F(SingleBalancerTest, Vanilla) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -608,6 +612,7 @@ TEST_F(SingleBalancerTest, Vanilla) { } TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { + SetNextResolutionAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor(); @@ -645,6 +650,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { } TEST_F(SingleBalancerTest, Fallback) { + SetNextResolutionAllBalancers(); const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendInResolution = backends_.size() / 2; @@ -711,6 +717,7 @@ TEST_F(SingleBalancerTest, Fallback) { } TEST_F(SingleBalancerTest, FallbackUpdate) { + SetNextResolutionAllBalancers(); const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendInResolution = backends_.size() / 3; @@ -818,6 +825,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) { } TEST_F(SingleBalancerTest, BackendsRestart) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -857,6 +865,7 @@ class UpdatesTest : public GrpclbEnd2endTest { }; TEST_F(UpdatesTest, UpdateBalancers) { + SetNextResolutionAllBalancers(); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( @@ -919,6 +928,7 @@ TEST_F(UpdatesTest, UpdateBalancers) { // verify that the LB channel inside grpclb keeps the initial connection (which // by definition is also present in the update). TEST_F(UpdatesTest, UpdateBalancersRepeated) { + SetNextResolutionAllBalancers(); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[0]}; @@ -989,6 +999,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { } TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + SetNextResolution(addresses); const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; @@ -1030,7 +1043,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); - std::vector<AddressData> addresses; + addresses.clear(); addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); SetNextResolution(addresses); @@ -1055,8 +1068,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { balancers_[2]->NotifyDoneWithServerlists(); EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); - EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); - EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + // The second balancer, published as part of the first update, may end up + // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer + // firing races with the arrival of the update containing the second + // balancer. + EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U); + EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U); + EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U); + EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); // Check LB policy name for the channel. @@ -1064,6 +1083,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { } TEST_F(SingleBalancerTest, Drop) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 2; @@ -1107,6 +1127,7 @@ TEST_F(SingleBalancerTest, Drop) { } TEST_F(SingleBalancerTest, DropAllFirst) { + SetNextResolutionAllBalancers(); // All registered addresses are marked as "drop". const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 1; @@ -1122,6 +1143,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) { } TEST_F(SingleBalancerTest, DropAll) { + SetNextResolutionAllBalancers(); ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); @@ -1152,6 +1174,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { }; TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), @@ -1186,6 +1209,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { } TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { + SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 3; const int num_of_drop_by_rate_limiting_addresses = 2; const int num_of_drop_by_load_balancing_addresses = 1; diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index a24cdc7d2d..30bd8bfef8 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -317,9 +317,15 @@ class TestServiceImpl : public TestService::Service { void grpc::testing::interop::RunServer( std::shared_ptr<ServerCredentials> creds) { - GPR_ASSERT(FLAGS_port != 0); + RunServer(creds, FLAGS_port, nullptr); +} + +void grpc::testing::interop::RunServer( + std::shared_ptr<ServerCredentials> creds, const int port, + ServerStartedCondition* server_started_condition) { + GPR_ASSERT(port != 0); std::ostringstream server_address; - server_address << "0.0.0.0:" << FLAGS_port; + server_address << "0.0.0.0:" << port; TestServiceImpl service; SimpleRequest request; @@ -333,6 +339,14 @@ void grpc::testing::interop::RunServer( } std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); + + // Signal that the server has started. + if (server_started_condition) { + std::unique_lock<std::mutex> lock(server_started_condition->mutex); + server_started_condition->server_started = true; + server_started_condition->condition.notify_all(); + } + while (!gpr_atm_no_barrier_load(&g_got_sigint)) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN))); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 6af003fe93..1bf7db1e14 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -19,6 +19,7 @@ #ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H #define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H +#include <condition_variable> #include <memory> #include <grpc/compression.h> @@ -50,8 +51,27 @@ class InteropServerContextInspector { namespace interop { extern gpr_atm g_got_sigint; + +struct ServerStartedCondition { + std::mutex mutex; + std::condition_variable condition; + bool server_started = false; +}; + +/// Run gRPC interop server using port FLAGS_port. +/// +/// \param creds The credentials associated with the server. void RunServer(std::shared_ptr<ServerCredentials> creds); +/// Run gRPC interop server. +/// +/// \param creds The credentials associated with the server. +/// \param port Port to use for the server. +/// \param server_started_condition (optional) Struct holding mutex, condition +/// variable, and condition used to notify when the server has started. +void RunServer(std::shared_ptr<ServerCredentials> creds, int port, + ServerStartedCondition* server_started_condition); + } // namespace interop } // namespace testing } // namespace grpc diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index b95ee0cc01..be4da4d0bd 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -137,7 +137,7 @@ class Fixture { grpc_channel_args c_args = args.c_channel_args(); ep_ = new DummyEndpoint; t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client); - grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr); + grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr); FlushExecCtx(); } diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h index 7f1aa48b56..7e20843875 100644 --- a/test/cpp/microbenchmarks/fullstack_fixtures.h +++ b/test/cpp/microbenchmarks/fullstack_fixtures.h @@ -174,7 +174,7 @@ class EndpointPairFixture : public BaseFixture { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); server_transport_ = grpc_create_chttp2_transport( - &exec_ctx, server_args, endpoints.server, 0 /* is_client */); + &exec_ctx, server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -186,7 +186,7 @@ class EndpointPairFixture : public BaseFixture { grpc_server_setup_transport(&exec_ctx, server_->c_server(), server_transport_, nullptr, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, + grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, nullptr, nullptr); } @@ -197,13 +197,13 @@ class EndpointPairFixture : public BaseFixture { fixture_configuration.ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); - client_transport_ = - grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1); + client_transport_ = grpc_create_chttp2_transport(&exec_ctx, &c_args, + endpoints.client, true); GPR_ASSERT(client_transport_); grpc_channel* channel = grpc_channel_create(&exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); - grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, + grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, nullptr, nullptr); channel_ = CreateChannelInternal("", channel); diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc index 23fff2ea8b..1c6f44dd7d 100644 --- a/test/cpp/performance/writes_per_rpc_test.cc +++ b/test/cpp/performance/writes_per_rpc_test.cc @@ -89,7 +89,7 @@ class EndpointPairFixture { const grpc_channel_args* server_args = grpc_server_get_channel_args(server_->c_server()); grpc_transport* transport = grpc_create_chttp2_transport( - &exec_ctx, server_args, endpoints.server, 0 /* is_client */); + &exec_ctx, server_args, endpoints.server, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -101,7 +101,8 @@ class EndpointPairFixture { grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, nullptr, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); } /* create channel */ @@ -111,12 +112,13 @@ class EndpointPairFixture { ApplyCommonChannelArguments(&args); grpc_channel_args c_args = args.c_channel_args(); - grpc_transport* transport = - grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1); + grpc_transport* transport = grpc_create_chttp2_transport( + &exec_ctx, &c_args, endpoints.client, true); GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, + nullptr); channel_ = CreateChannelInternal("", channel); } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9f20b148eb..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,21 +60,20 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} - virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; void ThreadFunc(size_t thread_idx, Thread* t) override { - InitThreadFuncImpl(thread_idx); + if (!InitThreadFuncImpl(thread_idx)) { + return; + } for (;;) { // run the loop body HistogramEntry entry; const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); t->UpdateHistogram(&entry); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } if (!thread_still_ok || ThreadCompleted()) { return; } @@ -109,9 +108,6 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; - - private: - void DestroyMultithreading() override final { EndThreads(); } }; class SynchronousUnaryClient final : public SynchronousClient { @@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFuncImpl(size_t thread_idx) override {} + bool InitThreadFuncImpl(size_t thread_idx) override { return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { @@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient { entry->set_status(s.error_code()); return true; } + + private: + void DestroyMultithreading() override final { EndThreads(); } }; template <class StreamType> @@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_), + stream_mu_(num_threads_), + shutdown_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { StartThreads(num_threads_); } virtual ~SynchronousStreamingClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - // forcibly cancel the streams, then finish - context_[i].TryCancel(); - (*stream)->Finish().IgnoreError(); - // don't log any error message on !ok since this was canceled - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); } protected: std::vector<grpc::ClientContext> context_; std::vector<std::unique_ptr<StreamType>> stream_; + // stream_mu_ is only needed when changing an element of stream_ or context_ + std::vector<std::mutex> stream_mu_; + // use struct Bool rather than bool because vector<bool> is not concurrent + struct Bool { + bool val; + Bool() : val(false) {} + }; + std::vector<Bool> shutdown_; const int messages_per_stream_; std::vector<int> messages_issued_; @@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient { // don't set the value since the stream is failed and shouldn't be timed entry->set_status(s.error_code()); if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", + thread_idx, s.error_message().c_str()); + } } + // Lock the stream_mu_ now because the client context could change + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } -}; -class SynchronousStreamingPingPongClient final - : public SynchronousStreamingClient< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { - public: - SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); + cleanup_threads.emplace_back([this, i, cleaner] { + std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; + if (stream_[i]) { + cleaner(i); } }); } @@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + void DestroyMultithreading() override final { + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); + EndThreads(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) {} + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } + + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + return false; + } messages_issued_[thread_idx] = 0; + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } messages_issued_[thread_idx] = 0; return true; } @@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + std::vector<double> last_issue_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + return false; + } last_issue_[thread_idx] = UsageTimer::Now(); + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_issue_; }; class SynchronousStreamingFromServerClient final @@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFuncImpl(size_t thread_idx) override { + ~SynchronousStreamingFromServerClient() {} + + private: + std::vector<double> last_recv_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + return false; + } last_recv_[thread_idx] = UsageTimer::Now(); + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final } FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_recv_; }; class SynchronousStreamingBothWaysClient final @@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } else { + return false; + } + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; diff --git a/test/distrib/cpp/run_distrib_test_cmake.bat b/test/distrib/cpp/run_distrib_test_cmake.bat new file mode 100644 index 0000000000..047846b0f1 --- /dev/null +++ b/test/distrib/cpp/run_distrib_test_cmake.bat @@ -0,0 +1,75 @@ +@rem Copyright 2016 gRPC authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. + +@rem enter this directory +cd /d %~dp0\..\..\.. + +@rem TODO(jtattermusch): Kokoro has pre-installed protoc.exe in C:\Program Files\ProtoC and that directory +@rem is on PATH. To avoid picking up the older version protoc.exe, we change the path to something non-existent. +set PATH=%PATH:ProtoC=DontPickupProtoC% + +@rem Install into ./testinstall, but use absolute path and foward slashes +set INSTALL_DIR=%cd:\=/%/testinstall + +@rem Download OpenSSL-Win32 originally installed from https://slproweb.com/products/Win32OpenSSL.html +powershell -Command "(New-Object Net.WebClient).DownloadFile('https://storage.googleapis.com/grpc-testing.appspot.com/OpenSSL-Win32-1_1_0g.zip', 'OpenSSL-Win32.zip')" +powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('OpenSSL-Win32.zip', '.');" + +@rem set absolute path to OpenSSL with forward slashes +set OPENSSL_DIR=%cd:\=/%/OpenSSL-Win32 + +cd third_party/zlib +mkdir cmake +cd cmake +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% .. +cmake --build . --config Release --target install || goto :error +cd ../../.. + +cd third_party/protobuf/cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -Dprotobuf_MSVC_STATIC_RUNTIME=OFF -Dprotobuf_BUILD_TESTS=OFF .. +cmake --build . --config Release --target install || goto :error +cd ../../../.. + +cd third_party/cares/cares +mkdir cmake +cd cmake +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% .. +cmake --build . --config Release --target install || goto :error +cd ../../../.. + +@rem OpenSSL-Win32 and OpenSSL-Win64 can be downloaded from https://slproweb.com/products/Win32OpenSSL.html +cd cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -DOPENSSL_ROOT_DIR=%OPENSSL_DIR% -DOPENSSL_INCLUDE_DIR=%OPENSSL_DIR%/include -DZLIB_LIBRARY=%INSTALL_DIR%/lib/zlibstatic.lib -DZLIB_INCLUDE_DIR=%INSTALL_DIR%/include -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release ../.. || goto :error +cmake --build . --config Release --target install || goto :error +cd ../.. + +# Build helloworld example using cmake +cd examples/cpp/helloworld +mkdir cmake +cd cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ../.. || goto :error +cmake --build . --config Release || goto :error +cd ../../../../.. + +goto :EOF + +:error +echo Failed! +exit /b %errorlevel% diff --git a/test/distrib/cpp/run_distrib_test_cmake.sh b/test/distrib/cpp/run_distrib_test_cmake.sh index ead8cc10bc..a9c081c2f5 100755 --- a/test/distrib/cpp/run_distrib_test_cmake.sh +++ b/test/distrib/cpp/run_distrib_test_cmake.sh @@ -19,7 +19,6 @@ cd $(dirname $0)/../../.. echo "deb http://ftp.debian.org/debian jessie-backports main" | tee /etc/apt/sources.list.d/jessie-backports.list apt-get update -#apt-get install -t jessie-backports -y libc-ares-dev # we need specifically version 1.12 apt-get install -t jessie-backports -y libssl-dev # Install c-ares |