aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Bill Feng <yfen@google.com>2018-08-27 10:32:50 -0700
committerGravatar Bill Feng <yfen@google.com>2018-08-27 10:32:50 -0700
commit5fb6603ad5c6f419159efa851c92ebb291d015c0 (patch)
tree5683a4607b00d3f38cca807a5ceae95a36959591 /test
parent2fa4d430a6f0937ac829b28bb28671c4811cc833 (diff)
parentca12a87af1371a37b168f1158bbb9088c9aee06a (diff)
Merge remote-tracking branch 'upstream/master' into feature/qps-bazel-test
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.cc2
-rw-r--r--test/core/end2end/bad_server_response_test.cc2
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc10
-rw-r--r--test/core/iomgr/BUILD13
-rw-r--r--test/core/iomgr/buffer_list_test.cc111
-rw-r--r--test/core/iomgr/endpoint_tests.cc7
-rw-r--r--test/core/iomgr/tcp_posix_test.cc109
-rw-r--r--test/core/tsi/ssl_transport_security_test.cc6
-rw-r--r--test/core/util/mock_endpoint.cc2
-rw-r--r--test/core/util/passthru_endpoint.cc2
-rw-r--r--test/core/util/trickle_endpoint.cc5
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc2
-rw-r--r--test/cpp/qps/client.h45
-rw-r--r--test/cpp/qps/driver.cc6
-rw-r--r--test/cpp/qps/driver.h3
-rw-r--r--test/cpp/qps/histogram.h5
-rw-r--r--test/cpp/qps/inproc_sync_unary_ping_pong_test.cc2
-rw-r--r--test/cpp/qps/qps_json_driver.cc19
-rw-r--r--test/cpp/qps/qps_openloop_test.cc2
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc2
20 files changed, 106 insertions, 249 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index ade23133c5..c03ebcf409 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_schedule_on_exec_ctx);
/* Write data */
- grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr);
+ grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure);
grpc_core::ExecCtx::Get()->Flush();
/* Await completion, unless the request is large and write may not finish
diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc
index f7396a1684..3d133cfc18 100644
--- a/test/core/end2end/bad_server_response_test.cc
+++ b/test/core/end2end/bad_server_response_test.cc
@@ -104,7 +104,7 @@ static void handle_write() {
grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
- grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
+ grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write);
}
static void handle_read(void* arg, grpc_error* error) {
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index ea9c000efb..f02fa9d998 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) {
&conn->client_write_buffer);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done, nullptr);
+ &conn->on_client_write_done);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
@@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) {
&conn->server_write_buffer);
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done, nullptr);
+ &conn->on_server_write_done);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
@@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done, nullptr);
+ &conn->on_server_write_done);
}
// Read more data.
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
@@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done, nullptr);
+ &conn->on_client_write_done);
}
// Read more data.
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
@@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_write_response_done, nullptr);
+ &conn->on_write_response_done);
}
/**
diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD
index 675d9e6278..002671a5fa 100644
--- a/test/core/iomgr/BUILD
+++ b/test/core/iomgr/BUILD
@@ -247,19 +247,6 @@ grpc_cc_test(
)
grpc_cc_test(
- name = "buffer_list_test",
- srcs = ["buffer_list_test.cc"],
- language = "C++",
- deps = [
- "//:gpr",
- "//:grpc",
- "//test/core/util:gpr_test_util",
- "//test/core/util:grpc_test_util",
- ],
-)
-
-
-grpc_cc_test(
name = "tcp_server_posix_test",
srcs = ["tcp_server_posix_test.cc"],
language = "C++",
diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc
deleted file mode 100644
index f1773580bd..0000000000
--- a/test/core/iomgr/buffer_list_test.cc
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-#include "src/core/lib/iomgr/buffer_list.h"
-
-#include <grpc/grpc.h>
-
-#include "test/core/util/test_config.h"
-
-#ifdef GRPC_LINUX_ERRQUEUE
-
-static void TestShutdownFlushesListVerifier(void* arg,
- grpc_core::Timestamps* ts,
- grpc_error* error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(arg != nullptr);
- gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
- gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
-}
-
-/** Tests that all TracedBuffer elements in the list are flushed out on
- * shutdown.
- * Also tests that arg is passed correctly.
- */
-static void TestShutdownFlushesList() {
- grpc_core::grpc_tcp_set_write_timestamps_callback(
- TestShutdownFlushesListVerifier);
- grpc_core::TracedBuffer* list = nullptr;
-#define NUM_ELEM 5
- gpr_atm verifier_called[NUM_ELEM];
- for (auto i = 0; i < NUM_ELEM; i++) {
- gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
- grpc_core::TracedBuffer::AddNewEntry(
- &list, i, static_cast<void*>(&verifier_called[i]));
- }
- grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
- GPR_ASSERT(list == nullptr);
- for (auto i = 0; i < NUM_ELEM; i++) {
- GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
- static_cast<gpr_atm>(1));
- }
-}
-
-static void TestVerifierCalledOnAckVerifier(void* arg,
- grpc_core::Timestamps* ts,
- grpc_error* error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(arg != nullptr);
- GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
- GPR_ASSERT(ts->acked_time.tv_sec == 123);
- GPR_ASSERT(ts->acked_time.tv_nsec == 456);
- gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
- gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
-}
-
-/** Tests that the timestamp verifier is called on an ACK timestamp.
- */
-static void TestVerifierCalledOnAck() {
- struct sock_extended_err serr;
- serr.ee_data = 213;
- serr.ee_info = SCM_TSTAMP_ACK;
- struct scm_timestamping tss;
- tss.ts[0].tv_sec = 123;
- tss.ts[0].tv_nsec = 456;
- grpc_core::grpc_tcp_set_write_timestamps_callback(
- TestVerifierCalledOnAckVerifier);
- grpc_core::TracedBuffer* list = nullptr;
- gpr_atm verifier_called;
- gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
- grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called);
- grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
- GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
- GPR_ASSERT(list == nullptr);
- grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
-}
-
-static void TestTcpBufferList() {
- TestVerifierCalledOnAck();
- TestShutdownFlushesList();
-}
-
-int main(int argc, char** argv) {
- grpc_test_init(argc, argv);
- grpc_init();
- TestTcpBufferList();
- grpc_shutdown();
- return 0;
-}
-
-#else /* GRPC_LINUX_ERRQUEUE */
-
-int main(int argc, char** argv) { return 0; }
-
-#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc
index a9e8ba86c5..8db8ac5ed6 100644
--- a/test/core/iomgr/endpoint_tests.cc
+++ b/test/core/iomgr/endpoint_tests.cc
@@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
- grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
- nullptr);
+ grpc_endpoint_write(state->write_ep, &state->outgoing,
+ &state->done_write);
gpr_free(slices);
return;
}
@@ -294,8 +294,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
- grpc_schedule_on_exec_ctx),
- nullptr);
+ grpc_schedule_on_exec_ctx));
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index 6447cc234d..3e87831e44 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -36,9 +36,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
-#include "src/core/lib/iomgr/buffer_list.h"
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/iomgr/endpoint_tests.h"
#include "test/core/util/test_config.h"
@@ -71,43 +68,6 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
-static void create_inet_sockets(int sv[2]) {
- /* Prepare listening socket */
- struct sockaddr_in addr;
- memset(&addr, 0, sizeof(struct sockaddr_in));
- addr.sin_family = AF_INET;
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- GPR_ASSERT(sock);
- GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
- listen(sock, 1);
-
- /* Prepare client socket and connect to server */
- socklen_t len = sizeof(sockaddr_in);
- GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
-
- int client = socket(AF_INET, SOCK_STREAM, 0);
- GPR_ASSERT(client);
- int ret;
- do {
- ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
- } while (ret == -1 && errno == EINTR);
-
- /* Accept client connection */
- len = sizeof(socklen_t);
- int server;
- do {
- server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
- } while (server == -1 && errno == EINTR);
- GPR_ASSERT(server != -1);
-
- sv[0] = server;
- sv[1] = client;
- int flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
-}
-
static ssize_t fill_socket(int fd) {
ssize_t write_bytes;
ssize_t total_bytes = 0;
@@ -329,10 +289,11 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
static void write_done(void* user_data /* write_socket_state */,
grpc_error* error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
struct write_socket_state* state =
static_cast<struct write_socket_state*>(user_data);
+ gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(g_mu);
+ gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
@@ -379,24 +340,10 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free(buf);
}
-/* Verifier for timestamps callback for write_test */
-void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
- grpc_error* error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(arg != nullptr);
- GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
- GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
- GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
- gpr_atm* done_timestamps = (gpr_atm*)arg;
- gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
-}
-
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
- socket in parallel with the read. If collect_timestamps is true, it will
- try to get timestamps for the write. */
-static void write_test(size_t num_bytes, size_t slice_size,
- bool collect_timestamps) {
+ socket in parallel with the read. */
+static void write_test(size_t num_bytes, size_t slice_size) {
int sv[2];
grpc_endpoint* ep;
struct write_socket_state state;
@@ -409,27 +356,19 @@ static void write_test(size_t num_bytes, size_t slice_size,
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
grpc_core::ExecCtx exec_ctx;
- if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
- return;
- }
-
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
num_bytes, slice_size);
- if (collect_timestamps) {
- create_inet_sockets(sv);
- } else {
- create_sockets(sv);
- }
+ create_sockets(sv);
grpc_arg a[1];
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
- &args, "test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
+ "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@@ -442,26 +381,18 @@ static void write_test(size_t num_bytes, size_t slice_size,
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
- gpr_atm done_timestamps;
- gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
- grpc_endpoint_write(ep, &outgoing, &write_done_closure,
- grpc_event_engine_can_track_errors() && collect_timestamps
- ? (void*)&done_timestamps
- : nullptr);
+ grpc_endpoint_write(ep, &outgoing, &write_done_closure);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
- exec_ctx.Flush();
gpr_mu_lock(g_mu);
for (;;) {
grpc_pollset_worker* worker = nullptr;
- if (state.write_done &&
- (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
- gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
+ if (state.write_done) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
- exec_ctx.Flush();
+
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -566,21 +497,14 @@ void run_tests(void) {
large_read_test(8192);
large_read_test(1);
- write_test(100, 8192, false);
- write_test(100, 1, false);
- write_test(100000, 8192, false);
- write_test(100000, 1, false);
- write_test(100000, 137, false);
-
- write_test(100, 8192, true);
- write_test(100, 1, true);
- write_test(100000, 8192, true);
- write_test(100000, 1, true);
- write_test(100, 137, true);
+ write_test(100, 8192);
+ write_test(100, 1);
+ write_test(100000, 8192);
+ write_test(100000, 1);
+ write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
- write_test(40320, i, false);
- write_test(40320, i, true);
+ write_test(40320, i);
}
release_fd_test(100, 8192);
@@ -625,7 +549,6 @@ int main(int argc, char** argv) {
grpc_closure destroyed;
grpc_test_init(argc, argv);
grpc_init();
- grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
{
grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
diff --git a/test/core/tsi/ssl_transport_security_test.cc b/test/core/tsi/ssl_transport_security_test.cc
index b477904d60..baffad6ea3 100644
--- a/test/core/tsi/ssl_transport_security_test.cc
+++ b/test/core/tsi/ssl_transport_security_test.cc
@@ -208,9 +208,11 @@ static void check_session_reusage(ssl_tsi_test_fixture* ssl_fixture,
tsi_peer_get_property_by_name(peer, TSI_SSL_SESSION_REUSED_PEER_PROPERTY);
GPR_ASSERT(session_reused != nullptr);
if (ssl_fixture->session_reused) {
- GPR_ASSERT(strcmp(session_reused->value.data, "true") == 0);
+ GPR_ASSERT(strncmp(session_reused->value.data, "true",
+ session_reused->value.length) == 0);
} else {
- GPR_ASSERT(strcmp(session_reused->value.data, "false") == 0);
+ GPR_ASSERT(strncmp(session_reused->value.data, "false",
+ session_reused->value.length) == 0);
}
}
diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc
index ef6fd62b51..1156cd5fc5 100644
--- a/test/core/util/mock_endpoint.cc
+++ b/test/core/util/mock_endpoint.cc
@@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, void* arg) {
+ grpc_closure* cb) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc
index 3cc8ad6fe1..5958216747 100644
--- a/test/core/util/passthru_endpoint.cc
+++ b/test/core/util/passthru_endpoint.cc
@@ -76,7 +76,7 @@ static half* other_half(half* h) {
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, void* arg) {
+ grpc_closure* cb) {
half* m = other_half(reinterpret_cast<half*>(ep));
gpr_mu_lock(&m->parent->mu);
grpc_error* error = GRPC_ERROR_NONE;
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc
index 62ed72a629..f2efb049b4 100644
--- a/test/core/util/trickle_endpoint.cc
+++ b/test/core/util/trickle_endpoint.cc
@@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) {
}
static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, void* arg) {
+ grpc_closure* cb) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == nullptr);
@@ -186,8 +186,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
te->last_write = now;
grpc_endpoint_write(
te->wrapped, &te->writing_buffer,
- GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx),
- nullptr);
+ GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx));
maybe_call_write_cb_locked(te);
}
}
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 189923a841..1e9bd273aa 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint {
}
static void write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, void* arg) {
+ grpc_closure* cb) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 9d7469c9b5..0b4b2ff0a9 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -180,6 +180,19 @@ class Client {
timer_result = timer_->Mark();
}
+ // Print the median latency per interval for one thread.
+ // If the number of warmup seconds is x, then the first x + 1 numbers in the
+ // vector are from the warmup period and should be discarded.
+ if (median_latency_collection_interval_seconds_ > 0) {
+ std::vector<double> medians_per_interval =
+ threads_[0]->GetMedianPerIntervalList();
+ gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
+ gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
+ for (size_t j = 0; j < medians_per_interval.size(); j++) {
+ gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
+ }
+ }
+
grpc_stats_data core_stats;
grpc_stats_collect(&core_stats);
@@ -210,6 +223,12 @@ class Client {
}
}
+ // Returns the interval (in seconds) between collecting latency medians. If 0,
+ // no periodic median latencies will be collected.
+ double GetLatencyCollectionIntervalInSeconds() {
+ return median_latency_collection_interval_seconds_;
+ }
+
virtual int GetPollCount() {
// For sync client.
return 0;
@@ -218,6 +237,7 @@ class Client {
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
+ double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
@@ -299,10 +319,27 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ std::vector<double> GetMedianPerIntervalList() {
+ return medians_each_interval_list_;
+ }
+
void UpdateHistogram(HistogramEntry* entry) {
std::lock_guard<std::mutex> g(mu_);
if (entry->value_used()) {
histogram_.Add(entry->value());
+ if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
+ histogram_per_interval_.Add(entry->value());
+ double now = UsageTimer::Now();
+ if ((now - interval_start_time_) >=
+ client_->GetLatencyCollectionIntervalInSeconds()) {
+ // Record the median latency of requests from the last interval.
+ // Divide by 1e3 to get microseconds.
+ medians_each_interval_list_.push_back(
+ histogram_per_interval_.Percentile(50) / 1e3);
+ histogram_per_interval_.Reset();
+ interval_start_time_ = now;
+ }
+ }
}
if (entry->status_used()) {
statuses_[entry->status()]++;
@@ -334,6 +371,11 @@ class Client {
Client* client_;
const size_t idx_;
std::thread impl_;
+ // The following are used only if
+ // median_latency_collection_interval_seconds_ is greater than 0
+ Histogram histogram_per_interval_;
+ std::vector<double> medians_each_interval_list_;
+ double interval_start_time_;
};
bool ThreadCompleted() {
@@ -392,7 +434,8 @@ class ClientImpl : public Client {
for (auto& t : connecting_threads) {
t->join();
}
-
+ median_latency_collection_interval_seconds_ =
+ config.median_latency_collection_interval_millis() / 1e3;
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
}
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index cabbd51843..11cfb4aa05 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -198,7 +198,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type, bool run_inproc) {
+ const grpc::string& credential_type, bool run_inproc,
+ int32_t median_latency_collection_interval_millis) {
if (run_inproc) {
g_inproc_servers = new std::vector<grpc::testing::Server*>;
}
@@ -317,6 +318,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}
+ client_config.set_median_latency_collection_interval_millis(
+ median_latency_collection_interval_millis);
+
// Targets are all set by now
result_client_config = client_config;
// Start clients
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index fede4d8045..cda89f7ddf 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -32,7 +32,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type, bool run_inproc);
+ const grpc::string& credential_type, bool run_inproc,
+ int32_t median_latency_collection_interval_millis);
bool RunQuit(const grpc::string& credential_type);
} // namespace testing
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
index ba72b5b332..6275128f34 100644
--- a/test/cpp/qps/histogram.h
+++ b/test/cpp/qps/histogram.h
@@ -34,6 +34,11 @@ class Histogram {
~Histogram() {
if (impl_) grpc_histogram_destroy(impl_);
}
+ void Reset() {
+ if (impl_) grpc_histogram_destroy(impl_);
+ impl_ = grpc_histogram_create(default_resolution(), default_max_possible());
+ }
+
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); }
diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
index f2e977d48b..56d1730252 100644
--- a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
@@ -48,7 +48,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, true);
+ kInsecureCredentialsType, true, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index 0ff692255c..eaa0dd992c 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -66,6 +66,11 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to.");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with workers");
DEFINE_bool(run_inproc, false, "Perform an in-process transport test");
+DEFINE_int32(
+ median_latency_collection_interval_millis, 0,
+ "Specifies the period between gathering latency medians in "
+ "milliseconds. The medians will be logged out on the client at the "
+ "end of the benchmark run. If 0, this periodic collection is disabled.");
namespace grpc {
namespace testing {
@@ -73,13 +78,13 @@ namespace testing {
static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";
- auto result =
- RunScenario(scenario.client_config(), scenario.num_clients(),
- scenario.server_config(), scenario.num_servers(),
- scenario.warmup_seconds(), scenario.benchmark_seconds(),
- !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
- FLAGS_qps_server_target_override, FLAGS_credential_type,
- FLAGS_run_inproc);
+ auto result = RunScenario(
+ scenario.client_config(), scenario.num_clients(),
+ scenario.server_config(), scenario.num_servers(),
+ scenario.warmup_seconds(), scenario.benchmark_seconds(),
+ !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
+ FLAGS_qps_server_target_override, FLAGS_credential_type, FLAGS_run_inproc,
+ FLAGS_median_latency_collection_interval_millis);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index df929b9811..6044f4265a 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -52,7 +52,7 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, false);
+ kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index bb415e9d63..a559c82cc8 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -55,7 +55,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, false);
+ kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);