aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.c8
-rw-r--r--test/core/end2end/bad_server_response_test.c6
-rw-r--r--test/core/end2end/fixtures/proxy.c36
-rw-r--r--test/core/end2end/tests/connectivity.c2
-rw-r--r--test/core/end2end/tests/filter_causes_close.c6
-rw-r--r--test/core/end2end/tests/filter_latency.c2
-rw-r--r--test/core/end2end/tests/payload.c2
-rw-r--r--test/core/end2end/tests/resource_quota_server.c23
-rw-r--r--test/core/end2end/tests/shutdown_finishes_tags.c2
-rw-r--r--test/core/end2end/tests/stream_compression_payload.c2
-rw-r--r--test/core/iomgr/endpoint_tests.c8
-rw-r--r--test/core/iomgr/tcp_posix_test.c12
-rw-r--r--test/core/security/secure_endpoint_test.c6
-rw-r--r--test/core/util/memory_counters.c8
-rw-r--r--test/core/util/mock_endpoint.c2
-rw-r--r--test/core/util/passthru_endpoint.c2
-rw-r--r--test/core/util/port.c3
-rw-r--r--test/core/util/port_server_client.c10
-rw-r--r--test/core/util/slice_splitter.c9
-rw-r--r--test/core/util/trickle_endpoint.c4
-rw-r--r--test/cpp/end2end/async_end2end_test.cc6
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc51
-rw-r--r--test/cpp/end2end/end2end_test.cc16
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc310
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc166
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc5
26 files changed, 335 insertions, 372 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index c3964ca84b..383d1240cb 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -45,18 +45,18 @@ typedef struct {
} thd_args;
static void thd_func(void *arg) {
- thd_args *a = arg;
+ thd_args *a = (thd_args *)arg;
a->validator(a->server, a->cq, a->registered_method);
gpr_event_set(&a->done_thd, (void *)1);
}
static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- thd_args *a = arg;
+ thd_args *a = (thd_args *)arg;
gpr_event_set(&a->done_write, (void *)1);
}
static void server_setup_transport(void *ts, grpc_transport *transport) {
- thd_args *a = ts;
+ thd_args *a = (thd_args *)ts;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_server_setup_transport(&exec_ctx, a->server, transport, NULL,
grpc_server_get_channel_args(a->server));
@@ -64,7 +64,7 @@ static void server_setup_transport(void *ts, grpc_transport *transport) {
}
static void read_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- gpr_event *read_done = arg;
+ gpr_event *read_done = (gpr_event *)arg;
gpr_event_set(read_done, (void *)1);
}
diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c
index 5f89058c45..eeabc769d3 100644
--- a/test/core/end2end/bad_server_response_test.c
+++ b/test/core/end2end/bad_server_response_test.c
@@ -136,7 +136,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
- test_tcp_server *server = arg;
+ test_tcp_server *server = (test_tcp_server *)arg;
GRPC_CLOSURE_INIT(&on_read, handle_read, NULL, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_write, done_write, NULL, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&state.temp_incoming_buffer);
@@ -237,7 +237,7 @@ typedef struct {
} poll_args;
static void actually_poll_server(void *arg) {
- poll_args *pa = arg;
+ poll_args *pa = (poll_args *)arg;
gpr_timespec deadline = n_sec_deadline(10);
while (true) {
bool done = gpr_atm_acq_load(&state.done_atm) != 0;
@@ -259,7 +259,7 @@ static void poll_server_until_read_done(test_tcp_server *server,
gpr_atm_rel_store(&state.done_atm, 0);
state.write_done = 0;
gpr_thd_id id;
- poll_args *pa = gpr_malloc(sizeof(*pa));
+ poll_args *pa = (poll_args *)gpr_malloc(sizeof(*pa));
pa->server = server;
pa->signal_when_done = signal_when_done;
gpr_thd_new(&id, actually_poll_server, pa, NULL);
diff --git a/test/core/end2end/fixtures/proxy.c b/test/core/end2end/fixtures/proxy.c
index d457aeefe8..9ad862728f 100644
--- a/test/core/end2end/fixtures/proxy.c
+++ b/test/core/end2end/fixtures/proxy.c
@@ -80,7 +80,7 @@ grpc_end2end_proxy *grpc_end2end_proxy_create(const grpc_end2end_proxy_def *def,
int proxy_port = grpc_pick_unused_port_or_die();
int server_port = grpc_pick_unused_port_or_die();
- grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy));
+ grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)gpr_malloc(sizeof(*proxy));
memset(proxy, 0, sizeof(*proxy));
gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
@@ -106,14 +106,14 @@ grpc_end2end_proxy *grpc_end2end_proxy_create(const grpc_end2end_proxy_def *def,
}
static closure *new_closure(void (*func)(void *arg, int success), void *arg) {
- closure *cl = gpr_malloc(sizeof(*cl));
+ closure *cl = (closure *)gpr_malloc(sizeof(*cl));
cl->func = func;
cl->arg = arg;
return cl;
}
static void shutdown_complete(void *arg, int success) {
- grpc_end2end_proxy *proxy = arg;
+ grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg;
proxy->shutdown = 1;
grpc_completion_queue_shutdown(proxy->cq);
}
@@ -146,12 +146,12 @@ static void unrefpc(proxy_call *pc, const char *reason) {
static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); }
static void on_c2p_sent_initial_metadata(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
unrefpc(pc, "on_c2p_sent_initial_metadata");
}
static void on_p2s_recv_initial_metadata(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -172,14 +172,14 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) {
}
static void on_p2s_sent_initial_metadata(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
unrefpc(pc, "on_p2s_sent_initial_metadata");
}
static void on_c2p_recv_msg(void *arg, int success);
static void on_p2s_sent_message(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -199,12 +199,12 @@ static void on_p2s_sent_message(void *arg, int success) {
}
static void on_p2s_sent_close(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
unrefpc(pc, "on_p2s_sent_close");
}
static void on_c2p_recv_msg(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -235,7 +235,7 @@ static void on_c2p_recv_msg(void *arg, int success) {
static void on_p2s_recv_msg(void *arg, int success);
static void on_c2p_sent_message(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -255,7 +255,7 @@ static void on_c2p_sent_message(void *arg, int success) {
}
static void on_p2s_recv_msg(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -275,12 +275,12 @@ static void on_p2s_recv_msg(void *arg, int success) {
}
static void on_c2p_sent_status(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
unrefpc(pc, "on_c2p_sent_status");
}
static void on_p2s_status(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
grpc_op op;
grpc_call_error err;
@@ -305,18 +305,18 @@ static void on_p2s_status(void *arg, int success) {
}
static void on_c2p_closed(void *arg, int success) {
- proxy_call *pc = arg;
+ proxy_call *pc = (proxy_call *)arg;
unrefpc(pc, "on_c2p_closed");
}
static void on_new_call(void *arg, int success) {
- grpc_end2end_proxy *proxy = arg;
+ grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg;
grpc_call_error err;
if (success) {
grpc_op op;
memset(&op, 0, sizeof(op));
- proxy_call *pc = gpr_malloc(sizeof(*pc));
+ proxy_call *pc = (proxy_call *)gpr_malloc(sizeof(*pc));
memset(pc, 0, sizeof(*pc));
pc->proxy = proxy;
GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
@@ -404,7 +404,7 @@ static void request_call(grpc_end2end_proxy *proxy) {
}
static void thread_main(void *arg) {
- grpc_end2end_proxy *proxy = arg;
+ grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg;
closure *cl;
for (;;) {
grpc_event ev = grpc_completion_queue_next(
@@ -416,7 +416,7 @@ static void thread_main(void *arg) {
case GRPC_QUEUE_SHUTDOWN:
return;
case GRPC_OP_COMPLETE:
- cl = ev.tag;
+ cl = (closure *)ev.tag;
cl->func(cl->arg, ev.success);
gpr_free(cl);
break;
diff --git a/test/core/end2end/tests/connectivity.c b/test/core/end2end/tests/connectivity.c
index a5af46e09e..610243ee3a 100644
--- a/test/core/end2end/tests/connectivity.c
+++ b/test/core/end2end/tests/connectivity.c
@@ -34,7 +34,7 @@ typedef struct {
} child_events;
static void child_thread(void *arg) {
- child_events *ce = arg;
+ child_events *ce = (child_events *)arg;
grpc_event ev;
gpr_event_set(&ce->started, (void *)1);
gpr_log(GPR_DEBUG, "verifying");
diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c
index 5a8c96d121..ee7aeb3f33 100644
--- a/test/core/end2end/tests/filter_causes_close.c
+++ b/test/core/end2end/tests/filter_causes_close.c
@@ -195,8 +195,8 @@ typedef struct { uint8_t unused; } channel_data;
static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = arg;
- call_data *calld = elem->call_data;
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
GRPC_CLOSURE_RUN(
exec_ctx, calld->recv_im_ready,
grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -208,7 +208,7 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg,
static void start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- call_data *calld = elem->call_data;
+ call_data *calld = (call_data *)elem->call_data;
if (op->recv_initial_metadata) {
calld->recv_im_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c
index 8918c3b2f6..c24934f05d 100644
--- a/test/core/end2end/tests/filter_latency.c
+++ b/test/core/end2end/tests/filter_latency.c
@@ -312,7 +312,7 @@ static const grpc_channel_filter test_server_filter = {
static bool maybe_add_filter(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder, void *arg) {
- grpc_channel_filter *filter = arg;
+ grpc_channel_filter *filter = (grpc_channel_filter *)arg;
if (g_enable_filter) {
// Want to add the filter as close to the end as possible, to make
// sure that all of the filters work well together. However, we
diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c
index 19fb4d9769..d98eed68e0 100644
--- a/test/core/end2end/tests/payload.c
+++ b/test/core/end2end/tests/payload.c
@@ -91,7 +91,7 @@ static grpc_slice generate_random_slice() {
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char *output;
const size_t output_size = 1024 * 1024;
- output = gpr_malloc(output_size);
+ output = (char *)gpr_malloc(output_size);
for (i = 0; i < output_size - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
}
diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c
index 34a6a80a31..0316920762 100644
--- a/test/core/end2end/tests/resource_quota_server.c
+++ b/test/core/end2end/tests/resource_quota_server.c
@@ -91,7 +91,7 @@ static grpc_slice generate_random_slice() {
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char *output;
const size_t output_size = 1024 * 1024;
- output = gpr_malloc(output_size);
+ output = (char *)gpr_malloc(output_size);
for (i = 0; i < output_size - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
}
@@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024);
#define NUM_CALLS 100
-#define CLIENT_BASE_TAG 1000
-#define SERVER_START_BASE_TAG 2000
-#define SERVER_RECV_BASE_TAG 3000
-#define SERVER_END_BASE_TAG 4000
+#define CLIENT_BASE_TAG 0x1000
+#define SERVER_START_BASE_TAG 0x2000
+#define SERVER_RECV_BASE_TAG 0x3000
+#define SERVER_END_BASE_TAG 0x4000
grpc_arg arg;
arg.key = GRPC_ARG_RESOURCE_QUOTA;
@@ -131,8 +131,10 @@ void resource_quota_server(grpc_end2end_test_config config) {
* will be verified on completion. */
grpc_slice request_payload_slice = generate_random_slice();
- grpc_call **client_calls = malloc(sizeof(grpc_call *) * NUM_CALLS);
- grpc_call **server_calls = malloc(sizeof(grpc_call *) * NUM_CALLS);
+ grpc_call **client_calls =
+ (grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS);
+ grpc_call **server_calls =
+ (grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS);
grpc_metadata_array *initial_metadata_recv =
malloc(sizeof(grpc_metadata_array) * NUM_CALLS);
grpc_metadata_array *trailing_metadata_recv =
@@ -141,13 +143,14 @@ void resource_quota_server(grpc_end2end_test_config config) {
malloc(sizeof(grpc_metadata_array) * NUM_CALLS);
grpc_call_details *call_details =
malloc(sizeof(grpc_call_details) * NUM_CALLS);
- grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS);
- grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS);
+ grpc_status_code *status =
+ (grpc_status_code *)malloc(sizeof(grpc_status_code) * NUM_CALLS);
+ grpc_slice *details = (grpc_slice *)malloc(sizeof(grpc_slice) * NUM_CALLS);
grpc_byte_buffer **request_payload =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
grpc_byte_buffer **request_payload_recv =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
- int *was_cancelled = malloc(sizeof(int) * NUM_CALLS);
+ int *was_cancelled = (int *)malloc(sizeof(int) * NUM_CALLS);
grpc_call_error error;
int pending_client_calls = 0;
int pending_server_start_calls = 0;
diff --git a/test/core/end2end/tests/shutdown_finishes_tags.c b/test/core/end2end/tests/shutdown_finishes_tags.c
index f9b8e4c955..7914cc95ba 100644
--- a/test/core/end2end/tests/shutdown_finishes_tags.c
+++ b/test/core/end2end/tests/shutdown_finishes_tags.c
@@ -78,7 +78,7 @@ static void test_early_server_shutdown_finishes_tags(
grpc_end2end_test_fixture f = begin_test(
config, "test_early_server_shutdown_finishes_tags", NULL, NULL);
cq_verifier *cqv = cq_verifier_create(f.cq);
- grpc_call *s = (void *)1;
+ grpc_call *s = (grpc_call *)(uintptr_t)1;
grpc_call_details call_details;
grpc_metadata_array request_metadata_recv;
diff --git a/test/core/end2end/tests/stream_compression_payload.c b/test/core/end2end/tests/stream_compression_payload.c
index 5135df81ed..e47d2aa93c 100644
--- a/test/core/end2end/tests/stream_compression_payload.c
+++ b/test/core/end2end/tests/stream_compression_payload.c
@@ -95,7 +95,7 @@ static grpc_slice generate_random_slice() {
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char *output;
const size_t output_size = 1024 * 1024;
- output = gpr_malloc(output_size);
+ output = (char *)gpr_malloc(output_size);
for (i = 0; i < output_size - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
}
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 353d79b11d..f8570edde7 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -77,7 +77,7 @@ static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
size_t *num_blocks, uint8_t *current_data) {
size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
- grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices);
+ grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices);
size_t num_bytes_left = num_bytes;
size_t i;
size_t j;
@@ -117,7 +117,8 @@ struct read_and_write_test_state {
static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx,
void *data, grpc_error *error) {
- struct read_and_write_test_state *state = data;
+ struct read_and_write_test_state *state =
+ (struct read_and_write_test_state *)data;
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
@@ -136,7 +137,8 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx,
static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx,
void *data, grpc_error *error) {
- struct read_and_write_test_state *state = data;
+ struct read_and_write_test_state *state =
+ (struct read_and_write_test_state *)data;
grpc_slice *slices = NULL;
size_t nslices;
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 8cfc1bcab1..cfb3cf897c 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -89,7 +89,7 @@ static ssize_t fill_socket(int fd) {
static size_t fill_socket_partial(int fd, size_t bytes) {
ssize_t write_bytes;
size_t total_bytes = 0;
- unsigned char *buf = gpr_malloc(bytes);
+ unsigned char *buf = (unsigned char *)gpr_malloc(bytes);
unsigned i;
for (i = 0; i < bytes; ++i) {
buf[i] = (uint8_t)(i % 256);
@@ -268,7 +268,7 @@ struct write_socket_state {
static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
size_t *num_blocks, uint8_t *current_data) {
size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
- grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices);
+ grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices);
size_t num_bytes_left = num_bytes;
unsigned i, j;
unsigned char *buf;
@@ -302,7 +302,7 @@ static void write_done(grpc_exec_ctx *exec_ctx,
}
void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
- unsigned char *buf = gpr_malloc(read_size);
+ unsigned char *buf = (unsigned char *)gpr_malloc(read_size);
ssize_t bytes_read;
size_t bytes_left = num_bytes;
int flags;
@@ -405,7 +405,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
}
void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *errors) {
- int *done = arg;
+ int *done = (int *)arg;
*done = 1;
GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(exec_ctx, g_pollset, NULL)));
@@ -549,7 +549,7 @@ static grpc_endpoint_test_config configs[] = {
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
grpc_error *error) {
- grpc_pollset_destroy(exec_ctx, p);
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p);
}
int main(int argc, char **argv) {
@@ -557,7 +557,7 @@ int main(int argc, char **argv) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_pollset = gpr_zalloc(grpc_pollset_size());
+ g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
run_tests();
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index e9f2c76738..839a05fa9b 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -70,7 +70,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
size_t still_pending_size;
size_t total_buffer_size = 8192;
size_t buffer_size = total_buffer_size;
- uint8_t *encrypted_buffer = gpr_malloc(buffer_size);
+ uint8_t *encrypted_buffer = (uint8_t *)gpr_malloc(buffer_size);
uint8_t *cur = encrypted_buffer;
grpc_slice encrypted_leftover;
for (i = 0; i < leftover_nslices; i++) {
@@ -202,7 +202,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
grpc_error *error) {
- grpc_pollset_destroy(exec_ctx, p);
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p);
}
int main(int argc, char **argv) {
@@ -211,7 +211,7 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- g_pollset = gpr_zalloc(grpc_pollset_size());
+ g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
grpc_endpoint_tests(configs[1], g_pollset, g_mu);
diff --git a/test/core/util/memory_counters.c b/test/core/util/memory_counters.c
index b898057034..9fb3ffe095 100644
--- a/test/core/util/memory_counters.c
+++ b/test/core/util/memory_counters.c
@@ -48,13 +48,13 @@ static void *guard_malloc(size_t size) {
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, (gpr_atm)size);
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_absolute, (gpr_atm)1);
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_relative, (gpr_atm)1);
- ptr = g_old_allocs.malloc_fn(size + sizeof(size));
+ ptr = (size_t *)g_old_allocs.malloc_fn(size + sizeof(size));
*ptr++ = size;
return ptr;
}
static void *guard_realloc(void *vptr, size_t size) {
- size_t *ptr = vptr;
+ size_t *ptr = (size_t *)vptr;
if (vptr == NULL) {
return guard_malloc(size);
}
@@ -67,13 +67,13 @@ static void *guard_realloc(void *vptr, size_t size) {
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, -(gpr_atm)*ptr);
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, (gpr_atm)size);
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_absolute, (gpr_atm)1);
- ptr = g_old_allocs.realloc_fn(ptr, size + sizeof(size));
+ ptr = (size_t *)g_old_allocs.realloc_fn(ptr, size + sizeof(size));
*ptr++ = size;
return ptr;
}
static void guard_free(void *vptr) {
- size_t *ptr = vptr;
+ size_t *ptr = (size_t *)vptr;
if (!vptr) return;
--ptr;
NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, -(gpr_atm)*ptr);
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index 40cf0a2652..bd386b2148 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -110,7 +110,7 @@ static const grpc_endpoint_vtable vtable = {
grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice),
grpc_resource_quota *resource_quota) {
- grpc_mock_endpoint *m = gpr_malloc(sizeof(*m));
+ grpc_mock_endpoint *m = (grpc_mock_endpoint *)gpr_malloc(sizeof(*m));
m->base.vtable = &vtable;
char *name;
gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m);
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index eef1f163f0..38a47584d5 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -183,7 +183,7 @@ void grpc_passthru_endpoint_create(grpc_endpoint **client,
grpc_endpoint **server,
grpc_resource_quota *resource_quota,
grpc_passthru_endpoint_stats *stats) {
- passthru_endpoint *m = gpr_malloc(sizeof(*m));
+ passthru_endpoint *m = (passthru_endpoint *)gpr_malloc(sizeof(*m));
m->halves = 2;
m->shutdown = 0;
m->stats = stats == NULL ? &m->dummy_stats : stats;
diff --git a/test/core/util/port.c b/test/core/util/port.c
index b1fc722858..61f2e5018f 100644
--- a/test/core/util/port.c
+++ b/test/core/util/port.c
@@ -75,7 +75,8 @@ static void chose_port(int port) {
atexit(free_chosen_ports);
}
num_chosen_ports++;
- chosen_ports = gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports);
+ chosen_ports =
+ (int *)gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports);
chosen_ports[num_chosen_ports - 1] = port;
}
diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c
index 1b015c95e9..ba4028dbee 100644
--- a/test/core/util/port_server_client.c
+++ b/test/core/util/port_server_client.c
@@ -42,14 +42,14 @@ typedef struct freereq {
static void destroy_pops_and_shutdown(grpc_exec_ctx *exec_ctx, void *p,
grpc_error *error) {
- grpc_pollset *pollset = grpc_polling_entity_pollset(p);
+ grpc_pollset *pollset = grpc_polling_entity_pollset((grpc_polling_entity *)p);
grpc_pollset_destroy(exec_ctx, pollset);
gpr_free(pollset);
}
static void freed_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- freereq *pr = arg;
+ freereq *pr = (freereq *)arg;
gpr_mu_lock(pr->mu);
pr->done = 1;
GRPC_LOG_IF_ERROR(
@@ -74,7 +74,7 @@ void grpc_free_port_using_server(int port) {
memset(&req, 0, sizeof(req));
memset(&rsp, 0, sizeof(rsp));
- grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size());
+ grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &pr.mu);
pr.pops = grpc_polling_entity_create_from_pollset(pollset);
shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
@@ -131,7 +131,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
size_t i;
int port = 0;
- portreq *pr = arg;
+ portreq *pr = (portreq *)arg;
int failed = 0;
grpc_httpcli_response *response = &pr->response;
@@ -207,7 +207,7 @@ int grpc_pick_port_using_server(void) {
memset(&pr, 0, sizeof(pr));
memset(&req, 0, sizeof(req));
- grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size());
+ grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &pr.mu);
pr.pops = grpc_polling_entity_create_from_pollset(pollset);
shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
diff --git a/test/core/util/slice_splitter.c b/test/core/util/slice_splitter.c
index f6d892f166..6fcef9acce 100644
--- a/test/core/util/slice_splitter.c
+++ b/test/core/util/slice_splitter.c
@@ -44,7 +44,8 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices,
switch (mode) {
case GRPC_SLICE_SPLIT_IDENTITY:
*dst_slice_count = src_slice_count;
- *dst_slices = gpr_malloc(sizeof(grpc_slice) * src_slice_count);
+ *dst_slices =
+ (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * src_slice_count);
for (i = 0; i < src_slice_count; i++) {
(*dst_slices)[i] = src_slices[i];
grpc_slice_ref((*dst_slices)[i]);
@@ -56,7 +57,7 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices,
for (i = 0; i < src_slice_count; i++) {
length += GRPC_SLICE_LENGTH(src_slices[i]);
}
- *dst_slices = gpr_malloc(sizeof(grpc_slice));
+ *dst_slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice));
**dst_slices = grpc_slice_malloc(length);
length = 0;
for (i = 0; i < src_slice_count; i++) {
@@ -72,7 +73,7 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices,
length += GRPC_SLICE_LENGTH(src_slices[i]);
}
*dst_slice_count = length;
- *dst_slices = gpr_malloc(sizeof(grpc_slice) * length);
+ *dst_slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * length);
length = 0;
for (i = 0; i < src_slice_count; i++) {
for (j = 0; j < GRPC_SLICE_LENGTH(src_slices[i]); j++) {
@@ -112,7 +113,7 @@ grpc_slice grpc_slice_merge(grpc_slice *slices, size_t nslices) {
for (i = 0; i < nslices; i++) {
if (GRPC_SLICE_LENGTH(slices[i]) + length > capacity) {
capacity = GPR_MAX(capacity * 2, GRPC_SLICE_LENGTH(slices[i]) + length);
- out = gpr_realloc(out, capacity);
+ out = (uint8_t *)gpr_realloc(out, capacity);
}
memcpy(out + length, GRPC_SLICE_START_PTR(slices[i]),
GRPC_SLICE_LENGTH(slices[i]));
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
index 4f3c30dcf6..fc066f9d80 100644
--- a/test/core/util/trickle_endpoint.c
+++ b/test/core/util/trickle_endpoint.c
@@ -128,7 +128,7 @@ static int te_get_fd(grpc_endpoint *ep) {
static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- trickle_endpoint *te = arg;
+ trickle_endpoint *te = (trickle_endpoint *)arg;
gpr_mu_lock(&te->mu);
te->writing = false;
grpc_slice_buffer_reset_and_unref(&te->writing_buffer);
@@ -142,7 +142,7 @@ static const grpc_endpoint_vtable vtable = {
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second) {
- trickle_endpoint *te = gpr_malloc(sizeof(*te));
+ trickle_endpoint *te = (trickle_endpoint *)gpr_malloc(sizeof(*te));
te->base.vtable = &vtable;
te->wrapped = wrap;
te->bytes_per_second = bytes_per_second;
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index e841a702d4..41090d161a 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -266,6 +266,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
}
void TearDown() override {
+ gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
@@ -274,7 +275,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
;
stub_.reset();
poll_overrider_.reset();
- gpr_tls_set(&g_is_async_end2end_test, 0);
grpc_recycle_unused_port(port_);
}
@@ -396,6 +396,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
ResetStub();
SendRpc(1);
EXPECT_EQ(0, notify);
+ gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
wait_thread.join();
EXPECT_EQ(1, notify);
@@ -404,8 +405,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
ResetStub();
SendRpc(1);
- server_->Shutdown();
+ std::thread t([this]() { server_->Shutdown(); });
server_->Wait();
+ t.join();
}
// Test a simple RPC using the async version of Next
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index 54408db600..c236f76e89 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -226,6 +226,31 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters();
}
+ bool SeenAllServers() {
+ for (const auto& server : servers_) {
+ if (server->service_.request_count() == 0) return false;
+ }
+ return true;
+ }
+
+ // Updates \a connection_order by appending to it the index of the newly
+ // connected server. Must be called after every single RPC.
+ void UpdateConnectionOrder(
+ const std::vector<std::unique_ptr<ServerData>>& servers,
+ std::vector<int>* connection_order) {
+ for (size_t i = 0; i < servers.size(); ++i) {
+ if (servers[i]->service_.request_count() == 1) {
+ // Was the server index known? If not, update connection_order.
+ const auto it =
+ std::find(connection_order->begin(), connection_order->end(), i);
+ if (it == connection_order->end()) {
+ connection_order->push_back(i);
+ return;
+ }
+ }
+ }
+ }
+
const grpc::string server_host_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@@ -370,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) {
ports.emplace_back(server->port_);
}
SetNextResolution(ports);
- for (size_t i = 0; i < servers_.size(); ++i) {
+ // Wait until all backends are ready.
+ do {
CheckRpcSendOk();
- }
- // One request should have gone to each server.
+ } while (!SeenAllServers());
+ ResetCounters();
+ // "Sync" to the end of the list. Next sequence of picks will start at the
+ // first server (index 0).
+ WaitForServer(servers_.size() - 1);
+ std::vector<int> connection_order;
for (size_t i = 0; i < servers_.size(); ++i) {
- EXPECT_EQ(1, servers_[i]->service_.request_count());
+ CheckRpcSendOk();
+ UpdateConnectionOrder(servers_, &connection_order);
}
+ // Backends should be iterated over in the order in which the addresses were
+ // given.
+ const auto expected = std::vector<int>{0, 1, 2};
+ EXPECT_EQ(expected, connection_order);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
@@ -529,13 +564,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
StartServers(kNumServers, ports);
ResetStub("round_robin");
SetNextResolution(ports);
- // Send one RPC per backend and make sure they are used in order.
- // Note: This relies on the fact that the subchannels are reported in
- // state READY in the order in which the addresses are specified,
- // which is only true because the backends are all local.
- for (size_t i = 0; i < servers_.size(); ++i) {
+ // Send a number of RPCs, which succeed.
+ for (size_t i = 0; i < 100; ++i) {
CheckRpcSendOk();
- EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
}
// Kill all servers
for (size_t i = 0; i < servers_.size(); ++i) {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 1f4861a7e6..e54cd03ca2 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -757,6 +757,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(s.ok());
}
+TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+
+ auto stream = stub_->RequestStream(&context, &response);
+ request.set_message("hello");
+ EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
+ EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
+ stream->WritesDone();
+ Status s = stream->Finish();
+ EXPECT_EQ(response.message(), "hellohello");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
ResetStub();
EchoRequest request;
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index b5cff664f6..570a3d1067 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds) {}
+ client_load_reporting_interval_seconds),
+ kRequestMessage_("Live long and prosper.") {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
+ void ResetBackendCounters() {
+ for (const auto& backend : backends_) backend->ResetCounters();
+ }
+
ClientStats WaitForLoadReports() {
ClientStats client_stats;
for (const auto& balancer : balancers_) {
@@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test {
return client_stats;
}
+ bool SeenAllBackends() {
+ for (const auto& backend : backends_) {
+ if (backend->request_count() == 0) return false;
+ }
+ return true;
+ }
+
+ void WaitForAllBackends() {
+ while (!SeenAllBackends()) {
+ CheckRpcSendOk();
+ }
+ ResetBackendCounters();
+ }
+
+ void WaitForBackend(size_t backend_idx) {
+ do {
+ CheckRpcSendOk();
+ } while (backends_[backend_idx]->request_count() == 0);
+ ResetBackendCounters();
+ }
+
struct AddressData {
int port;
bool is_balancer;
@@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test {
balancers_.at(i)->add_response(response, delay_ms);
}
- std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message,
- int num_rpcs,
- int timeout_ms = 1000) {
- std::vector<std::pair<Status, EchoResponse>> results;
+ Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
+ const bool local_response = (response == nullptr);
+ if (local_response) response = new EchoResponse;
EchoRequest request;
- EchoResponse response;
- request.set_message(message);
- for (int i = 0; i < num_rpcs; i++) {
- ClientContext context;
- context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
- Status status = stub_->Echo(&context, request, &response);
- results.push_back(std::make_pair(status, response));
+ request.set_message(kRequestMessage_);
+ ClientContext context;
+ context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ Status status = stub_->Echo(&context, request, response);
+ if (local_response) delete response;
+ return status;
+ }
+
+ void CheckRpcSendOk(const size_t times = 1) {
+ for (size_t i = 0; i < times; ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
- return results;
+ }
+
+ void CheckRpcSendFailure() {
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
}
template <typename T>
@@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test {
const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
-
std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
-
std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_;
-
grpc_fake_resolver_response_generator* response_generator_;
+ const grpc::string kRequestMessage_;
};
class SingleBalancerTest : public GrpclbEnd2endTest {
@@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+
+ // We need to wait for all backends to come online.
+ WaitForAllBackends();
+
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
@@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, num_backends_, kCallDeadlineMs);
+ CheckRpcSendOk(num_backends_);
const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0);
@@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
}
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
-TEST_F(SingleBalancerTest, RepeatedServerlist) {
- constexpr int kServerlistDelayMs = 100;
-
- // Send a serverlist right away.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- 0);
- // ... and the same one a bit later.
- ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
- kServerlistDelayMs);
-
- // Send num_backends/2 requests.
- auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2);
- // only the first half of the backends will receive them.
- for (size_t i = 0; i < backends_.size(); ++i) {
- if (i < backends_.size() / 2)
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- else
- EXPECT_EQ(0U, backend_servers_[i].service_->request_count())
- << "for backend #" << i;
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
- // Wait for the (duplicated) serverlist update.
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
-
- // Verify the LB has sent two responses.
- EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
-
- // Some more calls to complete the total number of backends.
- statuses_and_responses = SendRpc(
- kMessage_,
- num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */);
- // Because a duplicated serverlist should have no effect, all backends must
- // have been hit once now.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
- }
- EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- balancers_[0]->NotifyDoneWithServerlists();
- // The balancer got a single request.
- EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
- // Check LB policy name for the channel.
- EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
-}
-
TEST_F(SingleBalancerTest, BackendsRestart) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
@@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
- // Send 100 RPCs per server.
- auto statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
- // Each backend should have gotten 100 requests.
- for (size_t i = 0; i < backends_.size(); ++i) {
- EXPECT_EQ(kNumRpcsPerAddress,
- backend_servers_[i].service_->request_count());
- }
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
@@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown();
}
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
for (size_t i = 0; i < num_backends_; ++i) {
backends_.emplace_back(new BackendServiceImpl());
backend_servers_.emplace_back(ServerThread<BackendService>(
@@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
// TODO(dgq): implement the "backend restart" component as well. We need extra
// machinery to either update the LB responses "on the fly" or instruct
// backends which ports to restart on.
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- }
+ CheckRpcSendFailure();
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
@@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) {
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
gpr_time_from_millis(10000, GPR_TIMESPAN));
// Send 10 seconds worth of RPCs
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
+ CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
// grpclb continued using the original LB call to the first balancer, which
// doesn't assign the second backend.
@@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
- auto statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the first backend.
EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
@@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// This is serviced by the existing RR policy
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should again have gone to the first backend.
EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
@@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// receiving a request. In the meantime, the client continues to be serviced
// (by the first backend) without interruption.
EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
- do {
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
- } while (backend_servers_[1].service_->request_count() == 0);
+ WaitForBackend(1);
// This is serviced by the existing RR policy
backend_servers_[1].service_->ResetCounters();
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
- statuses_and_responses = SendRpc(kMessage_, 10);
+ CheckRpcSendOk(10);
gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
- for (const auto& status_and_response : statuses_and_responses) {
- EXPECT_TRUE(status_and_response.first.ok());
- EXPECT_EQ(status_and_response.second.message(), kMessage_);
- }
// All 10 requests should have gone to the second backend.
EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
@@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
-
+ // Send kNumRpcsPerAddress RPCs for each server and drop address.
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
0);
- const auto& statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- EXPECT_FALSE(status.ok());
- EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
- }
+ const Status status = SendRpc();
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
TEST_F(SingleBalancerTest, DropAll) {
@@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) {
1000);
// First call succeeds.
- auto statuses_and_responses = SendRpc(kMessage_, 1);
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
+ CheckRpcSendOk();
// But eventually, the update with only dropped servers is processed and calls
// fail.
+ Status status;
do {
- statuses_and_responses = SendRpc(kMessage_, 1);
- ASSERT_EQ(statuses_and_responses.size(), 1UL);
- } while (statuses_and_responses[0].first.ok());
- const Status& status = statuses_and_responses[0].first;
+ status = SendRpc();
+ } while (status.ok());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
@@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
- // Send 100 RPCs per server.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
-
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
- EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
- << " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
- }
-
+ // Send kNumRpcsPerAddress RPCs per server.
+ CheckRpcSendOk(kNumRpcsPerAddress * num_backends_);
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(kNumRpcsPerAddress,
@@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
- // Send 100 RPCs for each server and drop address.
- const auto& statuses_and_responses =
- SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
size_t num_drops = 0;
- for (const auto& status_and_response : statuses_and_responses) {
- const Status& status = status_and_response.first;
- const EchoResponse& response = status_and_response.second;
+ for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) {
+ EchoResponse response;
+ const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 936681fec1..070034fe33 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -29,6 +29,7 @@
extern "C" {
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -154,23 +155,59 @@ class Fixture {
grpc_transport *t_;
};
-static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+class Closure : public grpc_closure {
+ public:
+ virtual ~Closure() {}
+};
+
+template <class F>
+std::unique_ptr<Closure> MakeClosure(
+ F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
+ struct C : public Closure {
+ C(const F &f, grpc_closure_scheduler *sched) : f_(f) {
+ GRPC_CLOSURE_INIT(this, Execute, this, sched);
+ }
+ F f_;
+ static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ static_cast<C *>(arg)->f_(exec_ctx, error);
+ }
+ };
+ return std::unique_ptr<Closure>(new C(f, sched));
+}
+
+template <class F>
+grpc_closure *MakeOnceClosure(
+ F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
+ struct C : public grpc_closure {
+ C(const F &f) : f_(f) {}
+ F f_;
+ static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ static_cast<C *>(arg)->f_(exec_ctx, error);
+ delete static_cast<C *>(arg);
+ }
+ };
+ auto *c = new C{f};
+ return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
+}
class Stream {
public:
Stream(Fixture *f) : f_(f) {
- GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream");
stream_size_ = grpc_transport_stream_size(f->transport());
stream_ = gpr_malloc(stream_size_);
arena_ = gpr_arena_create(4096);
}
~Stream() {
+ gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME));
gpr_free(stream_);
gpr_arena_destroy(arena_);
}
void Init(benchmark::State &state) {
+ GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this,
+ "test_stream");
+ gpr_event_init(&done_);
memset(stream_, 0, stream_size_);
if ((state.iterations() & 0xffff) == 0) {
gpr_arena_destroy(arena_);
@@ -181,13 +218,17 @@ class Stream {
NULL, arena_);
}
- void DestroyThen(grpc_closure *closure) {
- grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(),
- static_cast<grpc_stream *>(stream_), closure);
+ void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) {
+ destroy_closure_ = closure;
+#ifndef NDEBUG
+ grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen");
+#else
+ grpc_stream_unref(exec_ctx, &refcount_);
+#endif
}
- void Op(grpc_transport_stream_op_batch *op) {
- grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(),
+ void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) {
+ grpc_transport_perform_stream_op(exec_ctx, f_->transport(),
static_cast<grpc_stream *>(stream_), op);
}
@@ -196,48 +237,24 @@ class Stream {
}
private:
+ static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ auto stream = static_cast<Stream *>(arg);
+ grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(),
+ static_cast<grpc_stream *>(stream->stream_),
+ stream->destroy_closure_);
+ gpr_event_set(&stream->done_, (void *)1);
+ }
+
Fixture *f_;
grpc_stream_refcount refcount_;
gpr_arena *arena_;
size_t stream_size_;
void *stream_;
+ grpc_closure *destroy_closure_ = nullptr;
+ gpr_event done_;
};
-class Closure : public grpc_closure {
- public:
- virtual ~Closure() {}
-};
-
-template <class F>
-std::unique_ptr<Closure> MakeClosure(
- F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
- struct C : public Closure {
- C(const F &f, grpc_closure_scheduler *sched) : f_(f) {
- GRPC_CLOSURE_INIT(this, Execute, this, sched);
- }
- F f_;
- static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- static_cast<C *>(arg)->f_(exec_ctx, error);
- }
- };
- return std::unique_ptr<Closure>(new C(f, sched));
-}
-
-template <class F>
-grpc_closure *MakeOnceClosure(
- F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) {
- struct C : public grpc_closure {
- C(const F &f) : f_(f) {}
- F f_;
- static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- static_cast<C *>(arg)->f_(exec_ctx, error);
- delete static_cast<C *>(arg);
- }
- };
- auto *c = new C{f};
- return GRPC_CLOSURE_INIT(c, C::Execute, c, sched);
-}
-
////////////////////////////////////////////////////////////////////////////////
// Benchmarks
//
@@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload op_payload;
+ memset(&op, 0, sizeof(op));
+ op.cancel_stream = true;
+ op.payload = &op_payload;
+ op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
std::unique_ptr<Closure> next =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
s.Init(state);
- s.DestroyThen(next.get());
+ s.Op(exec_ctx, &op);
+ s.DestroyThen(exec_ctx, next.get());
});
GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
@@ -314,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
op.on_complete = done.get();
op.send_initial_metadata = true;
op.payload->send_initial_metadata.send_initial_metadata = &b;
- s.Op(&op);
+ s.Op(exec_ctx, &op);
});
done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(start.get());
+ s.Op(exec_ctx, &op);
+ s.DestroyThen(exec_ctx, start.get());
});
GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
@@ -348,22 +372,28 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
if (!state.KeepRunning()) return;
reset_op();
op.on_complete = c.get();
- s.Op(&op);
+ s.Op(exec_ctx, &op);
});
GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ reset_op();
+ op.cancel_stream = true;
+ op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
+ s.Op(f.exec_ctx(), &op);
+ s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
track_counters.Finish(state);
}
BENCHMARK(BM_TransportEmptyOp);
+std::vector<std::unique_ptr<gpr_event>> done_events;
+
static void BM_TransportStreamSend(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
- Stream s(&f);
- s.Init(state);
+ auto s = std::unique_ptr<Stream>(new Stream(&f));
+ s->Init(state);
grpc_transport_stream_op_batch op;
grpc_transport_stream_op_batch_payload op_payload;
memset(&op_payload, 0, sizeof(op_payload));
@@ -390,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) {
grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i])));
}
+ gpr_event *bm_done = new gpr_event;
+ gpr_event_init(bm_done);
+
std::unique_ptr<Closure> c =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
- if (!state.KeepRunning()) return;
+ if (!state.KeepRunning()) {
+ gpr_event_set(bm_done, (void *)1);
+ return;
+ }
// force outgoing window to be yuge
- s.chttp2_stream()->flow_control.remote_window_delta =
+ s->chttp2_stream()->flow_control.remote_window_delta =
1024 * 1024 * 1024;
f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
@@ -402,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) {
op.on_complete = c.get();
op.send_message = true;
op.payload->send_message.send_message = &send_stream.base;
- s.Op(&op);
+ s->Op(exec_ctx, &op);
});
reset_op();
op.send_initial_metadata = true;
op.payload->send_initial_metadata.send_initial_metadata = &b;
op.on_complete = c.get();
- s.Op(&op);
+ s->Op(f.exec_ctx(), &op);
f.FlushExecCtx();
+ gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME));
+ done_events.emplace_back(bm_done);
+
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ s->Op(f.exec_ctx(), &op);
+ s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
+ s.reset();
track_counters.Finish(state);
grpc_metadata_batch_destroy(f.exec_ctx(), &b);
grpc_slice_buffer_destroy(&send_buffer);
@@ -535,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.recv_message_ready = drain_start.get();
- s.Op(&op);
+ s.Op(exec_ctx, &op);
f.PushInput(grpc_slice_ref(incoming_data));
});
@@ -578,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
op.payload->recv_initial_metadata.recv_initial_metadata_ready =
do_nothing.get();
op.on_complete = c.get();
- s.Op(&op);
+ s.Op(f.exec_ctx(), &op);
f.PushInput(SLICE_FROM_BUFFER(
"\x00\x00\x00\x04\x00\x00\x00\x00\x00"
// Generated using:
@@ -596,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
reset_op();
op.cancel_stream = true;
op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- s.Op(&op);
- s.DestroyThen(
- MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
+ s.Op(f.exec_ctx(), &op);
+ s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx,
+ grpc_error *error) {}));
f.FlushExecCtx();
track_counters.Finish(state);
grpc_metadata_batch_destroy(f.exec_ctx(), &b);
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index 135b4710ce..59fb29dd60 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -105,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
- void Log(int64_t iteration) {
+ void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN {
auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_);
grpc_chttp2_transport* client =
reinterpret_cast<grpc_chttp2_transport*>(client_transport_);
@@ -193,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture {
return p;
}
- void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) {
+ void UpdateStats(grpc_chttp2_transport* t, Stats* s,
+ size_t backlog) GPR_ATTRIBUTE_NO_TSAN {
if (backlog == 0) {
if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) {
s->streams_stalled_due_to_stream_flow_control++;