diff options
author | Bill Feng <yfen@google.com> | 2018-08-27 10:32:50 -0700 |
---|---|---|
committer | Bill Feng <yfen@google.com> | 2018-08-27 10:32:50 -0700 |
commit | 5fb6603ad5c6f419159efa851c92ebb291d015c0 (patch) | |
tree | 5683a4607b00d3f38cca807a5ceae95a36959591 /test | |
parent | 2fa4d430a6f0937ac829b28bb28671c4811cc833 (diff) | |
parent | ca12a87af1371a37b168f1158bbb9088c9aee06a (diff) |
Merge remote-tracking branch 'upstream/master' into feature/qps-bazel-test
Diffstat (limited to 'test')
-rw-r--r-- | test/core/bad_client/bad_client.cc | 2 | ||||
-rw-r--r-- | test/core/end2end/bad_server_response_test.cc | 2 | ||||
-rw-r--r-- | test/core/end2end/fixtures/http_proxy_fixture.cc | 10 | ||||
-rw-r--r-- | test/core/iomgr/BUILD | 13 | ||||
-rw-r--r-- | test/core/iomgr/buffer_list_test.cc | 111 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_tests.cc | 7 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.cc | 109 | ||||
-rw-r--r-- | test/core/tsi/ssl_transport_security_test.cc | 6 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.cc | 2 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.cc | 2 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.cc | 5 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 45 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 3 | ||||
-rw-r--r-- | test/cpp/qps/histogram.h | 5 | ||||
-rw-r--r-- | test/cpp/qps/inproc_sync_unary_ping_pong_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_json_driver.cc | 19 | ||||
-rw-r--r-- | test/cpp/qps/qps_openloop_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/secure_sync_unary_ping_pong_test.cc | 2 |
20 files changed, 106 insertions, 249 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index ade23133c5..c03ebcf409 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags, grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr); + grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure); grpc_core::ExecCtx::Get()->Flush(); /* Await completion, unless the request is large and write may not finish diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index f7396a1684..3d133cfc18 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -104,7 +104,7 @@ static void handle_write() { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr); + grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write); } static void handle_read(void* arg, grpc_error* error) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index ea9c000efb..f02fa9d998 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) { &conn->client_write_buffer); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "write_done"); @@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) { &conn->server_write_buffer); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "server_write"); @@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done); } // Read more data. grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, @@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done); } // Read more data. grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, @@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_write_response_done, nullptr); + &conn->on_write_response_done); } /** diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 675d9e6278..002671a5fa 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -247,19 +247,6 @@ grpc_cc_test( ) grpc_cc_test( - name = "buffer_list_test", - srcs = ["buffer_list_test.cc"], - language = "C++", - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:gpr_test_util", - "//test/core/util:grpc_test_util", - ], -) - - -grpc_cc_test( name = "tcp_server_posix_test", srcs = ["tcp_server_posix_test.cc"], language = "C++", diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc deleted file mode 100644 index f1773580bd..0000000000 --- a/test/core/iomgr/buffer_list_test.cc +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/port.h" - -#include "src/core/lib/iomgr/buffer_list.h" - -#include <grpc/grpc.h> - -#include "test/core/util/test_config.h" - -#ifdef GRPC_LINUX_ERRQUEUE - -static void TestShutdownFlushesListVerifier(void* arg, - grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - gpr_atm* done = reinterpret_cast<gpr_atm*>(arg); - gpr_atm_rel_store(done, static_cast<gpr_atm>(1)); -} - -/** Tests that all TracedBuffer elements in the list are flushed out on - * shutdown. - * Also tests that arg is passed correctly. - */ -static void TestShutdownFlushesList() { - grpc_core::grpc_tcp_set_write_timestamps_callback( - TestShutdownFlushesListVerifier); - grpc_core::TracedBuffer* list = nullptr; -#define NUM_ELEM 5 - gpr_atm verifier_called[NUM_ELEM]; - for (auto i = 0; i < NUM_ELEM; i++) { - gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0)); - grpc_core::TracedBuffer::AddNewEntry( - &list, i, static_cast<void*>(&verifier_called[i])); - } - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); - GPR_ASSERT(list == nullptr); - for (auto i = 0; i < NUM_ELEM; i++) { - GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) == - static_cast<gpr_atm>(1)); - } -} - -static void TestVerifierCalledOnAckVerifier(void* arg, - grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.tv_sec == 123); - GPR_ASSERT(ts->acked_time.tv_nsec == 456); - gpr_atm* done = reinterpret_cast<gpr_atm*>(arg); - gpr_atm_rel_store(done, static_cast<gpr_atm>(1)); -} - -/** Tests that the timestamp verifier is called on an ACK timestamp. - */ -static void TestVerifierCalledOnAck() { - struct sock_extended_err serr; - serr.ee_data = 213; - serr.ee_info = SCM_TSTAMP_ACK; - struct scm_timestamping tss; - tss.ts[0].tv_sec = 123; - tss.ts[0].tv_nsec = 456; - grpc_core::grpc_tcp_set_write_timestamps_callback( - TestVerifierCalledOnAckVerifier); - grpc_core::TracedBuffer* list = nullptr; - gpr_atm verifier_called; - gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0)); - grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called); - grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss); - GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1)); - GPR_ASSERT(list == nullptr); - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); -} - -static void TestTcpBufferList() { - TestVerifierCalledOnAck(); - TestShutdownFlushesList(); -} - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - grpc_init(); - TestTcpBufferList(); - grpc_shutdown(); - return 0; -} - -#else /* GRPC_LINUX_ERRQUEUE */ - -int main(int argc, char** argv) { return 0; } - -#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index a9e8ba86c5..8db8ac5ed6 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) { &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write, - nullptr); + grpc_endpoint_write(state->write_ep, &state->outgoing, + &state->done_write); gpr_free(slices); return; } @@ -294,8 +294,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_endpoint_write(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx), - nullptr); + grpc_schedule_on_exec_ctx)); wait_for_fail_count(&fail_count, 3); grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 6447cc234d..3e87831e44 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -36,9 +36,6 @@ #include <grpc/support/time.h> #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/buffer_list.h" -#include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/iomgr/sockaddr_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/iomgr/endpoint_tests.h" #include "test/core/util/test_config.h" @@ -71,43 +68,6 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); } -static void create_inet_sockets(int sv[2]) { - /* Prepare listening socket */ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - int sock = socket(AF_INET, SOCK_STREAM, 0); - GPR_ASSERT(sock); - GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0); - listen(sock, 1); - - /* Prepare client socket and connect to server */ - socklen_t len = sizeof(sockaddr_in); - GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0); - - int client = socket(AF_INET, SOCK_STREAM, 0); - GPR_ASSERT(client); - int ret; - do { - ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in)); - } while (ret == -1 && errno == EINTR); - - /* Accept client connection */ - len = sizeof(socklen_t); - int server; - do { - server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len); - } while (server == -1 && errno == EINTR); - GPR_ASSERT(server != -1); - - sv[0] = server; - sv[1] = client; - int flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} - static ssize_t fill_socket(int fd) { ssize_t write_bytes; ssize_t total_bytes = 0; @@ -329,10 +289,11 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, static void write_done(void* user_data /* write_socket_state */, grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); struct write_socket_state* state = static_cast<struct write_socket_state*>(user_data); + gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(g_mu); + gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); @@ -379,24 +340,10 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_free(buf); } -/* Verifier for timestamps callback for write_test */ -void timestamps_verifier(void* arg, grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); - gpr_atm* done_timestamps = (gpr_atm*)arg; - gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1)); -} - /* Write to a socket using the grpc_tcp API, then drain it directly. Note that if the write does not complete immediately we need to drain the - socket in parallel with the read. If collect_timestamps is true, it will - try to get timestamps for the write. */ -static void write_test(size_t num_bytes, size_t slice_size, - bool collect_timestamps) { + socket in parallel with the read. */ +static void write_test(size_t num_bytes, size_t slice_size) { int sv[2]; grpc_endpoint* ep; struct write_socket_state state; @@ -409,27 +356,19 @@ static void write_test(size_t num_bytes, size_t slice_size, grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); grpc_core::ExecCtx exec_ctx; - if (collect_timestamps && !grpc_event_engine_can_track_errors()) { - return; - } - gpr_log(GPR_INFO, "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR, num_bytes, slice_size); - if (collect_timestamps) { - create_inet_sockets(sv); - } else { - create_sockets(sv); - } + create_sockets(sv); grpc_arg a[1]; a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps), - &args, "test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args, + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); state.ep = ep; @@ -442,26 +381,18 @@ static void write_test(size_t num_bytes, size_t slice_size, GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - gpr_atm done_timestamps; - gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0)); - grpc_endpoint_write(ep, &outgoing, &write_done_closure, - grpc_event_engine_can_track_errors() && collect_timestamps - ? (void*)&done_timestamps - : nullptr); + grpc_endpoint_write(ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); - exec_ctx.Flush(); gpr_mu_lock(g_mu); for (;;) { grpc_pollset_worker* worker = nullptr; - if (state.write_done && - (!(grpc_event_engine_can_track_errors() && collect_timestamps) || - gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) { + if (state.write_done) { break; } GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - exec_ctx.Flush(); + gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -566,21 +497,14 @@ void run_tests(void) { large_read_test(8192); large_read_test(1); - write_test(100, 8192, false); - write_test(100, 1, false); - write_test(100000, 8192, false); - write_test(100000, 1, false); - write_test(100000, 137, false); - - write_test(100, 8192, true); - write_test(100, 1, true); - write_test(100000, 8192, true); - write_test(100000, 1, true); - write_test(100, 137, true); + write_test(100, 8192); + write_test(100, 1); + write_test(100000, 8192); + write_test(100000, 1); + write_test(100000, 137); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { - write_test(40320, i, false); - write_test(40320, i, true); + write_test(40320, i); } release_fd_test(100, 8192); @@ -625,7 +549,6 @@ int main(int argc, char** argv) { grpc_closure destroyed; grpc_test_init(argc, argv); grpc_init(); - grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier); { grpc_core::ExecCtx exec_ctx; g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); diff --git a/test/core/tsi/ssl_transport_security_test.cc b/test/core/tsi/ssl_transport_security_test.cc index b477904d60..baffad6ea3 100644 --- a/test/core/tsi/ssl_transport_security_test.cc +++ b/test/core/tsi/ssl_transport_security_test.cc @@ -208,9 +208,11 @@ static void check_session_reusage(ssl_tsi_test_fixture* ssl_fixture, tsi_peer_get_property_by_name(peer, TSI_SSL_SESSION_REUSED_PEER_PROPERTY); GPR_ASSERT(session_reused != nullptr); if (ssl_fixture->session_reused) { - GPR_ASSERT(strcmp(session_reused->value.data, "true") == 0); + GPR_ASSERT(strncmp(session_reused->value.data, "true", + session_reused->value.length) == 0); } else { - GPR_ASSERT(strcmp(session_reused->value.data, "false") == 0); + GPR_ASSERT(strncmp(session_reused->value.data, "false", + session_reused->value.length) == 0); } } diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index ef6fd62b51..1156cd5fc5 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 3cc8ad6fe1..5958216747 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -76,7 +76,7 @@ static half* other_half(half* h) { } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { half* m = other_half(reinterpret_cast<half*>(ep)); gpr_mu_lock(&m->parent->mu); grpc_error* error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index 62ed72a629..f2efb049b4 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) { } static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep); gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == nullptr); @@ -186,8 +186,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) { te->last_write = now; grpc_endpoint_write( te->wrapped, &te->writing_buffer, - GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx), - nullptr); + GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx)); maybe_call_write_cb_locked(te); } } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 189923a841..1e9bd273aa 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint { } static void write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 9d7469c9b5..0b4b2ff0a9 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -180,6 +180,19 @@ class Client { timer_result = timer_->Mark(); } + // Print the median latency per interval for one thread. + // If the number of warmup seconds is x, then the first x + 1 numbers in the + // vector are from the warmup period and should be discarded. + if (median_latency_collection_interval_seconds_ > 0) { + std::vector<double> medians_per_interval = + threads_[0]->GetMedianPerIntervalList(); + gpr_log(GPR_INFO, "Num threads: %ld", threads_.size()); + gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size()); + for (size_t j = 0; j < medians_per_interval.size(); j++) { + gpr_log(GPR_INFO, "%f", medians_per_interval[j]); + } + } + grpc_stats_data core_stats; grpc_stats_collect(&core_stats); @@ -210,6 +223,12 @@ class Client { } } + // Returns the interval (in seconds) between collecting latency medians. If 0, + // no periodic median latencies will be collected. + double GetLatencyCollectionIntervalInSeconds() { + return median_latency_collection_interval_seconds_; + } + virtual int GetPollCount() { // For sync client. return 0; @@ -218,6 +237,7 @@ class Client { protected: bool closed_loop_; gpr_atm thread_pool_done_; + double median_latency_collection_interval_seconds_; // In seconds void StartThreads(size_t num_threads) { gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false)); @@ -299,10 +319,27 @@ class Client { MergeStatusHistogram(statuses_, s); } + std::vector<double> GetMedianPerIntervalList() { + return medians_each_interval_list_; + } + void UpdateHistogram(HistogramEntry* entry) { std::lock_guard<std::mutex> g(mu_); if (entry->value_used()) { histogram_.Add(entry->value()); + if (client_->GetLatencyCollectionIntervalInSeconds() > 0) { + histogram_per_interval_.Add(entry->value()); + double now = UsageTimer::Now(); + if ((now - interval_start_time_) >= + client_->GetLatencyCollectionIntervalInSeconds()) { + // Record the median latency of requests from the last interval. + // Divide by 1e3 to get microseconds. + medians_each_interval_list_.push_back( + histogram_per_interval_.Percentile(50) / 1e3); + histogram_per_interval_.Reset(); + interval_start_time_ = now; + } + } } if (entry->status_used()) { statuses_[entry->status()]++; @@ -334,6 +371,11 @@ class Client { Client* client_; const size_t idx_; std::thread impl_; + // The following are used only if + // median_latency_collection_interval_seconds_ is greater than 0 + Histogram histogram_per_interval_; + std::vector<double> medians_each_interval_list_; + double interval_start_time_; }; bool ThreadCompleted() { @@ -392,7 +434,8 @@ class ClientImpl : public Client { for (auto& t : connecting_threads) { t->join(); } - + median_latency_collection_interval_seconds_ = + config.median_latency_collection_interval_millis() / 1e3; ClientRequestCreator<RequestType> create_req(&request_, config.payload_config()); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index cabbd51843..11cfb4aa05 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -198,7 +198,8 @@ std::unique_ptr<ScenarioResult> RunScenario( const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type, bool run_inproc) { + const grpc::string& credential_type, bool run_inproc, + int32_t median_latency_collection_interval_millis) { if (run_inproc) { g_inproc_servers = new std::vector<grpc::testing::Server*>; } @@ -317,6 +318,9 @@ std::unique_ptr<ScenarioResult> RunScenario( } } + client_config.set_median_latency_collection_interval_millis( + median_latency_collection_interval_millis); + // Targets are all set by now result_client_config = client_config; // Start clients diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index fede4d8045..cda89f7ddf 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -32,7 +32,8 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, const grpc::string& qps_server_target_override, - const grpc::string& credential_type, bool run_inproc); + const grpc::string& credential_type, bool run_inproc, + int32_t median_latency_collection_interval_millis); bool RunQuit(const grpc::string& credential_type); } // namespace testing diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index ba72b5b332..6275128f34 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -34,6 +34,11 @@ class Histogram { ~Histogram() { if (impl_) grpc_histogram_destroy(impl_); } + void Reset() { + if (impl_) grpc_histogram_destroy(impl_); + impl_ = grpc_histogram_create(default_resolution(), default_max_possible()); + } + Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); } diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc index f2e977d48b..56d1730252 100644 --- a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc @@ -48,7 +48,7 @@ static void RunSynchronousUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, true); + kInsecureCredentialsType, true, 0); GetReporter()->ReportQPS(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index 0ff692255c..eaa0dd992c 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -66,6 +66,11 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to."); DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType, "Credential type for communication with workers"); DEFINE_bool(run_inproc, false, "Perform an in-process transport test"); +DEFINE_int32( + median_latency_collection_interval_millis, 0, + "Specifies the period between gathering latency medians in " + "milliseconds. The medians will be logged out on the client at the " + "end of the benchmark run. If 0, this periodic collection is disabled."); namespace grpc { namespace testing { @@ -73,13 +78,13 @@ namespace testing { static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, bool* success) { std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n"; - auto result = - RunScenario(scenario.client_config(), scenario.num_clients(), - scenario.server_config(), scenario.num_servers(), - scenario.warmup_seconds(), scenario.benchmark_seconds(), - !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2, - FLAGS_qps_server_target_override, FLAGS_credential_type, - FLAGS_run_inproc); + auto result = RunScenario( + scenario.client_config(), scenario.num_clients(), + scenario.server_config(), scenario.num_servers(), + scenario.warmup_seconds(), scenario.benchmark_seconds(), + !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2, + FLAGS_qps_server_target_override, FLAGS_credential_type, FLAGS_run_inproc, + FLAGS_median_latency_collection_interval_millis); // Amend the result with scenario config. Eventually we should adjust // RunScenario contract so we don't need to touch the result here. diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index df929b9811..6044f4265a 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -52,7 +52,7 @@ static void RunQPS() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, false); + kInsecureCredentialsType, false, 0); GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc index bb415e9d63..a559c82cc8 100644 --- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc @@ -55,7 +55,7 @@ static void RunSynchronousUnaryPingPong() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "", - kInsecureCredentialsType, false); + kInsecureCredentialsType, false, 0); GetReporter()->ReportQPS(*result); GetReporter()->ReportLatency(*result); |