diff options
Diffstat (limited to 'test')
26 files changed, 335 insertions, 372 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index c3964ca84b..383d1240cb 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -45,18 +45,18 @@ typedef struct { } thd_args; static void thd_func(void *arg) { - thd_args *a = arg; + thd_args *a = (thd_args *)arg; a->validator(a->server, a->cq, a->registered_method); gpr_event_set(&a->done_thd, (void *)1); } static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - thd_args *a = arg; + thd_args *a = (thd_args *)arg; gpr_event_set(&a->done_write, (void *)1); } static void server_setup_transport(void *ts, grpc_transport *transport) { - thd_args *a = ts; + thd_args *a = (thd_args *)ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_server_setup_transport(&exec_ctx, a->server, transport, NULL, grpc_server_get_channel_args(a->server)); @@ -64,7 +64,7 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { } static void read_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - gpr_event *read_done = arg; + gpr_event *read_done = (gpr_event *)arg; gpr_event_set(read_done, (void *)1); } diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index 5f89058c45..eeabc769d3 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -136,7 +136,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { gpr_free(acceptor); - test_tcp_server *server = arg; + test_tcp_server *server = (test_tcp_server *)arg; GRPC_CLOSURE_INIT(&on_read, handle_read, NULL, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_write, done_write, NULL, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&state.temp_incoming_buffer); @@ -237,7 +237,7 @@ typedef struct { } poll_args; static void actually_poll_server(void *arg) { - poll_args *pa = arg; + poll_args *pa = (poll_args *)arg; gpr_timespec deadline = n_sec_deadline(10); while (true) { bool done = gpr_atm_acq_load(&state.done_atm) != 0; @@ -259,7 +259,7 @@ static void poll_server_until_read_done(test_tcp_server *server, gpr_atm_rel_store(&state.done_atm, 0); state.write_done = 0; gpr_thd_id id; - poll_args *pa = gpr_malloc(sizeof(*pa)); + poll_args *pa = (poll_args *)gpr_malloc(sizeof(*pa)); pa->server = server; pa->signal_when_done = signal_when_done; gpr_thd_new(&id, actually_poll_server, pa, NULL); diff --git a/test/core/end2end/fixtures/proxy.c b/test/core/end2end/fixtures/proxy.c index d457aeefe8..9ad862728f 100644 --- a/test/core/end2end/fixtures/proxy.c +++ b/test/core/end2end/fixtures/proxy.c @@ -80,7 +80,7 @@ grpc_end2end_proxy *grpc_end2end_proxy_create(const grpc_end2end_proxy_def *def, int proxy_port = grpc_pick_unused_port_or_die(); int server_port = grpc_pick_unused_port_or_die(); - grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy)); + grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port); @@ -106,14 +106,14 @@ grpc_end2end_proxy *grpc_end2end_proxy_create(const grpc_end2end_proxy_def *def, } static closure *new_closure(void (*func)(void *arg, int success), void *arg) { - closure *cl = gpr_malloc(sizeof(*cl)); + closure *cl = (closure *)gpr_malloc(sizeof(*cl)); cl->func = func; cl->arg = arg; return cl; } static void shutdown_complete(void *arg, int success) { - grpc_end2end_proxy *proxy = arg; + grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg; proxy->shutdown = 1; grpc_completion_queue_shutdown(proxy->cq); } @@ -146,12 +146,12 @@ static void unrefpc(proxy_call *pc, const char *reason) { static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); } static void on_c2p_sent_initial_metadata(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; unrefpc(pc, "on_c2p_sent_initial_metadata"); } static void on_p2s_recv_initial_metadata(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -172,14 +172,14 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) { } static void on_p2s_sent_initial_metadata(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; unrefpc(pc, "on_p2s_sent_initial_metadata"); } static void on_c2p_recv_msg(void *arg, int success); static void on_p2s_sent_message(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -199,12 +199,12 @@ static void on_p2s_sent_message(void *arg, int success) { } static void on_p2s_sent_close(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; unrefpc(pc, "on_p2s_sent_close"); } static void on_c2p_recv_msg(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -235,7 +235,7 @@ static void on_c2p_recv_msg(void *arg, int success) { static void on_p2s_recv_msg(void *arg, int success); static void on_c2p_sent_message(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -255,7 +255,7 @@ static void on_c2p_sent_message(void *arg, int success) { } static void on_p2s_recv_msg(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -275,12 +275,12 @@ static void on_p2s_recv_msg(void *arg, int success) { } static void on_c2p_sent_status(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; unrefpc(pc, "on_c2p_sent_status"); } static void on_p2s_status(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; grpc_op op; grpc_call_error err; @@ -305,18 +305,18 @@ static void on_p2s_status(void *arg, int success) { } static void on_c2p_closed(void *arg, int success) { - proxy_call *pc = arg; + proxy_call *pc = (proxy_call *)arg; unrefpc(pc, "on_c2p_closed"); } static void on_new_call(void *arg, int success) { - grpc_end2end_proxy *proxy = arg; + grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg; grpc_call_error err; if (success) { grpc_op op; memset(&op, 0, sizeof(op)); - proxy_call *pc = gpr_malloc(sizeof(*pc)); + proxy_call *pc = (proxy_call *)gpr_malloc(sizeof(*pc)); memset(pc, 0, sizeof(*pc)); pc->proxy = proxy; GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata, @@ -404,7 +404,7 @@ static void request_call(grpc_end2end_proxy *proxy) { } static void thread_main(void *arg) { - grpc_end2end_proxy *proxy = arg; + grpc_end2end_proxy *proxy = (grpc_end2end_proxy *)arg; closure *cl; for (;;) { grpc_event ev = grpc_completion_queue_next( @@ -416,7 +416,7 @@ static void thread_main(void *arg) { case GRPC_QUEUE_SHUTDOWN: return; case GRPC_OP_COMPLETE: - cl = ev.tag; + cl = (closure *)ev.tag; cl->func(cl->arg, ev.success); gpr_free(cl); break; diff --git a/test/core/end2end/tests/connectivity.c b/test/core/end2end/tests/connectivity.c index a5af46e09e..610243ee3a 100644 --- a/test/core/end2end/tests/connectivity.c +++ b/test/core/end2end/tests/connectivity.c @@ -34,7 +34,7 @@ typedef struct { } child_events; static void child_thread(void *arg) { - child_events *ce = arg; + child_events *ce = (child_events *)arg; grpc_event ev; gpr_event_set(&ce->started, (void *)1); gpr_log(GPR_DEBUG, "verifying"); diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 5a8c96d121..ee7aeb3f33 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -195,8 +195,8 @@ typedef struct { uint8_t unused; } channel_data; static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = arg; - call_data *calld = elem->call_data; + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; GRPC_CLOSURE_RUN( exec_ctx, calld->recv_im_ready, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -208,7 +208,7 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, static void start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - call_data *calld = elem->call_data; + call_data *calld = (call_data *)elem->call_data; if (op->recv_initial_metadata) { calld->recv_im_ready = op->payload->recv_initial_metadata.recv_initial_metadata_ready; diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c index 8918c3b2f6..c24934f05d 100644 --- a/test/core/end2end/tests/filter_latency.c +++ b/test/core/end2end/tests/filter_latency.c @@ -312,7 +312,7 @@ static const grpc_channel_filter test_server_filter = { static bool maybe_add_filter(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { - grpc_channel_filter *filter = arg; + grpc_channel_filter *filter = (grpc_channel_filter *)arg; if (g_enable_filter) { // Want to add the filter as close to the end as possible, to make // sure that all of the filters work well together. However, we diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c index 19fb4d9769..d98eed68e0 100644 --- a/test/core/end2end/tests/payload.c +++ b/test/core/end2end/tests/payload.c @@ -91,7 +91,7 @@ static grpc_slice generate_random_slice() { static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; char *output; const size_t output_size = 1024 * 1024; - output = gpr_malloc(output_size); + output = (char *)gpr_malloc(output_size); for (i = 0; i < output_size - 1; ++i) { output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; } diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 34a6a80a31..0316920762 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -91,7 +91,7 @@ static grpc_slice generate_random_slice() { static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; char *output; const size_t output_size = 1024 * 1024; - output = gpr_malloc(output_size); + output = (char *)gpr_malloc(output_size); for (i = 0; i < output_size - 1; ++i) { output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; } @@ -111,10 +111,10 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024); #define NUM_CALLS 100 -#define CLIENT_BASE_TAG 1000 -#define SERVER_START_BASE_TAG 2000 -#define SERVER_RECV_BASE_TAG 3000 -#define SERVER_END_BASE_TAG 4000 +#define CLIENT_BASE_TAG 0x1000 +#define SERVER_START_BASE_TAG 0x2000 +#define SERVER_RECV_BASE_TAG 0x3000 +#define SERVER_END_BASE_TAG 0x4000 grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; @@ -131,8 +131,10 @@ void resource_quota_server(grpc_end2end_test_config config) { * will be verified on completion. */ grpc_slice request_payload_slice = generate_random_slice(); - grpc_call **client_calls = malloc(sizeof(grpc_call *) * NUM_CALLS); - grpc_call **server_calls = malloc(sizeof(grpc_call *) * NUM_CALLS); + grpc_call **client_calls = + (grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS); + grpc_call **server_calls = + (grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS); grpc_metadata_array *initial_metadata_recv = malloc(sizeof(grpc_metadata_array) * NUM_CALLS); grpc_metadata_array *trailing_metadata_recv = @@ -141,13 +143,14 @@ void resource_quota_server(grpc_end2end_test_config config) { malloc(sizeof(grpc_metadata_array) * NUM_CALLS); grpc_call_details *call_details = malloc(sizeof(grpc_call_details) * NUM_CALLS); - grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS); - grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS); + grpc_status_code *status = + (grpc_status_code *)malloc(sizeof(grpc_status_code) * NUM_CALLS); + grpc_slice *details = (grpc_slice *)malloc(sizeof(grpc_slice) * NUM_CALLS); grpc_byte_buffer **request_payload = malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); grpc_byte_buffer **request_payload_recv = malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); - int *was_cancelled = malloc(sizeof(int) * NUM_CALLS); + int *was_cancelled = (int *)malloc(sizeof(int) * NUM_CALLS); grpc_call_error error; int pending_client_calls = 0; int pending_server_start_calls = 0; diff --git a/test/core/end2end/tests/shutdown_finishes_tags.c b/test/core/end2end/tests/shutdown_finishes_tags.c index f9b8e4c955..7914cc95ba 100644 --- a/test/core/end2end/tests/shutdown_finishes_tags.c +++ b/test/core/end2end/tests/shutdown_finishes_tags.c @@ -78,7 +78,7 @@ static void test_early_server_shutdown_finishes_tags( grpc_end2end_test_fixture f = begin_test( config, "test_early_server_shutdown_finishes_tags", NULL, NULL); cq_verifier *cqv = cq_verifier_create(f.cq); - grpc_call *s = (void *)1; + grpc_call *s = (grpc_call *)(uintptr_t)1; grpc_call_details call_details; grpc_metadata_array request_metadata_recv; diff --git a/test/core/end2end/tests/stream_compression_payload.c b/test/core/end2end/tests/stream_compression_payload.c index 5135df81ed..e47d2aa93c 100644 --- a/test/core/end2end/tests/stream_compression_payload.c +++ b/test/core/end2end/tests/stream_compression_payload.c @@ -95,7 +95,7 @@ static grpc_slice generate_random_slice() { static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; char *output; const size_t output_size = 1024 * 1024; - output = gpr_malloc(output_size); + output = (char *)gpr_malloc(output_size); for (i = 0; i < output_size - 1; ++i) { output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 353d79b11d..f8570edde7 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -77,7 +77,7 @@ static void end_test(grpc_endpoint_test_config config) { config.clean_up(); } static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data) { size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0); - grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices); + grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices); size_t num_bytes_left = num_bytes; size_t i; size_t j; @@ -117,7 +117,8 @@ struct read_and_write_test_state { static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, void *data, grpc_error *error) { - struct read_and_write_test_state *state = data; + struct read_and_write_test_state *state = + (struct read_and_write_test_state *)data; state->bytes_read += count_slices( state->incoming.slices, state->incoming.count, &state->current_read_data); @@ -136,7 +137,8 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, void *data, grpc_error *error) { - struct read_and_write_test_state *state = data; + struct read_and_write_test_state *state = + (struct read_and_write_test_state *)data; grpc_slice *slices = NULL; size_t nslices; diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 8cfc1bcab1..cfb3cf897c 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -89,7 +89,7 @@ static ssize_t fill_socket(int fd) { static size_t fill_socket_partial(int fd, size_t bytes) { ssize_t write_bytes; size_t total_bytes = 0; - unsigned char *buf = gpr_malloc(bytes); + unsigned char *buf = (unsigned char *)gpr_malloc(bytes); unsigned i; for (i = 0; i < bytes; ++i) { buf[i] = (uint8_t)(i % 256); @@ -268,7 +268,7 @@ struct write_socket_state { static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size, size_t *num_blocks, uint8_t *current_data) { size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u); - grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices); + grpc_slice *slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * nslices); size_t num_bytes_left = num_bytes; unsigned i, j; unsigned char *buf; @@ -302,7 +302,7 @@ static void write_done(grpc_exec_ctx *exec_ctx, } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { - unsigned char *buf = gpr_malloc(read_size); + unsigned char *buf = (unsigned char *)gpr_malloc(read_size); ssize_t bytes_read; size_t bytes_left = num_bytes; int flags; @@ -405,7 +405,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { } void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *errors) { - int *done = arg; + int *done = (int *)arg; *done = 1; GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, NULL))); @@ -549,7 +549,7 @@ static grpc_endpoint_test_config configs[] = { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(exec_ctx, p); + grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p); } int main(int argc, char **argv) { @@ -557,7 +557,7 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_zalloc(grpc_pollset_size()); + g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu); run_tests(); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index e9f2c76738..839a05fa9b 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -70,7 +70,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t still_pending_size; size_t total_buffer_size = 8192; size_t buffer_size = total_buffer_size; - uint8_t *encrypted_buffer = gpr_malloc(buffer_size); + uint8_t *encrypted_buffer = (uint8_t *)gpr_malloc(buffer_size); uint8_t *cur = encrypted_buffer; grpc_slice encrypted_leftover; for (i = 0; i < leftover_nslices; i++) { @@ -202,7 +202,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(exec_ctx, p); + grpc_pollset_destroy(exec_ctx, (grpc_pollset *)p); } int main(int argc, char **argv) { @@ -211,7 +211,7 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_zalloc(grpc_pollset_size()); + g_pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); grpc_endpoint_tests(configs[0], g_pollset, g_mu); grpc_endpoint_tests(configs[1], g_pollset, g_mu); diff --git a/test/core/util/memory_counters.c b/test/core/util/memory_counters.c index b898057034..9fb3ffe095 100644 --- a/test/core/util/memory_counters.c +++ b/test/core/util/memory_counters.c @@ -48,13 +48,13 @@ static void *guard_malloc(size_t size) { NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, (gpr_atm)size); NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_absolute, (gpr_atm)1); NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_relative, (gpr_atm)1); - ptr = g_old_allocs.malloc_fn(size + sizeof(size)); + ptr = (size_t *)g_old_allocs.malloc_fn(size + sizeof(size)); *ptr++ = size; return ptr; } static void *guard_realloc(void *vptr, size_t size) { - size_t *ptr = vptr; + size_t *ptr = (size_t *)vptr; if (vptr == NULL) { return guard_malloc(size); } @@ -67,13 +67,13 @@ static void *guard_realloc(void *vptr, size_t size) { NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, -(gpr_atm)*ptr); NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, (gpr_atm)size); NO_BARRIER_FETCH_ADD(&g_memory_counters.total_allocs_absolute, (gpr_atm)1); - ptr = g_old_allocs.realloc_fn(ptr, size + sizeof(size)); + ptr = (size_t *)g_old_allocs.realloc_fn(ptr, size + sizeof(size)); *ptr++ = size; return ptr; } static void guard_free(void *vptr) { - size_t *ptr = vptr; + size_t *ptr = (size_t *)vptr; if (!vptr) return; --ptr; NO_BARRIER_FETCH_ADD(&g_memory_counters.total_size_relative, -(gpr_atm)*ptr); diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index 40cf0a2652..bd386b2148 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -110,7 +110,7 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice), grpc_resource_quota *resource_quota) { - grpc_mock_endpoint *m = gpr_malloc(sizeof(*m)); + grpc_mock_endpoint *m = (grpc_mock_endpoint *)gpr_malloc(sizeof(*m)); m->base.vtable = &vtable; char *name; gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index eef1f163f0..38a47584d5 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -183,7 +183,7 @@ void grpc_passthru_endpoint_create(grpc_endpoint **client, grpc_endpoint **server, grpc_resource_quota *resource_quota, grpc_passthru_endpoint_stats *stats) { - passthru_endpoint *m = gpr_malloc(sizeof(*m)); + passthru_endpoint *m = (passthru_endpoint *)gpr_malloc(sizeof(*m)); m->halves = 2; m->shutdown = 0; m->stats = stats == NULL ? &m->dummy_stats : stats; diff --git a/test/core/util/port.c b/test/core/util/port.c index b1fc722858..61f2e5018f 100644 --- a/test/core/util/port.c +++ b/test/core/util/port.c @@ -75,7 +75,8 @@ static void chose_port(int port) { atexit(free_chosen_ports); } num_chosen_ports++; - chosen_ports = gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports); + chosen_ports = + (int *)gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports); chosen_ports[num_chosen_ports - 1] = port; } diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index 1b015c95e9..ba4028dbee 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -42,14 +42,14 @@ typedef struct freereq { static void destroy_pops_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset *pollset = grpc_polling_entity_pollset(p); + grpc_pollset *pollset = grpc_polling_entity_pollset((grpc_polling_entity *)p); grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); } static void freed_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - freereq *pr = arg; + freereq *pr = (freereq *)arg; gpr_mu_lock(pr->mu); pr->done = 1; GRPC_LOG_IF_ERROR( @@ -74,7 +74,7 @@ void grpc_free_port_using_server(int port) { memset(&req, 0, sizeof(req)); memset(&rsp, 0, sizeof(rsp)); - grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size()); + grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &pr.mu); pr.pops = grpc_polling_entity_create_from_pollset(pollset); shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, @@ -131,7 +131,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { size_t i; int port = 0; - portreq *pr = arg; + portreq *pr = (portreq *)arg; int failed = 0; grpc_httpcli_response *response = &pr->response; @@ -207,7 +207,7 @@ int grpc_pick_port_using_server(void) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size()); + grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &pr.mu); pr.pops = grpc_polling_entity_create_from_pollset(pollset); shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, diff --git a/test/core/util/slice_splitter.c b/test/core/util/slice_splitter.c index f6d892f166..6fcef9acce 100644 --- a/test/core/util/slice_splitter.c +++ b/test/core/util/slice_splitter.c @@ -44,7 +44,8 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices, switch (mode) { case GRPC_SLICE_SPLIT_IDENTITY: *dst_slice_count = src_slice_count; - *dst_slices = gpr_malloc(sizeof(grpc_slice) * src_slice_count); + *dst_slices = + (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * src_slice_count); for (i = 0; i < src_slice_count; i++) { (*dst_slices)[i] = src_slices[i]; grpc_slice_ref((*dst_slices)[i]); @@ -56,7 +57,7 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices, for (i = 0; i < src_slice_count; i++) { length += GRPC_SLICE_LENGTH(src_slices[i]); } - *dst_slices = gpr_malloc(sizeof(grpc_slice)); + *dst_slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice)); **dst_slices = grpc_slice_malloc(length); length = 0; for (i = 0; i < src_slice_count; i++) { @@ -72,7 +73,7 @@ void grpc_split_slices(grpc_slice_split_mode mode, grpc_slice *src_slices, length += GRPC_SLICE_LENGTH(src_slices[i]); } *dst_slice_count = length; - *dst_slices = gpr_malloc(sizeof(grpc_slice) * length); + *dst_slices = (grpc_slice *)gpr_malloc(sizeof(grpc_slice) * length); length = 0; for (i = 0; i < src_slice_count; i++) { for (j = 0; j < GRPC_SLICE_LENGTH(src_slices[i]); j++) { @@ -112,7 +113,7 @@ grpc_slice grpc_slice_merge(grpc_slice *slices, size_t nslices) { for (i = 0; i < nslices; i++) { if (GRPC_SLICE_LENGTH(slices[i]) + length > capacity) { capacity = GPR_MAX(capacity * 2, GRPC_SLICE_LENGTH(slices[i]) + length); - out = gpr_realloc(out, capacity); + out = (uint8_t *)gpr_realloc(out, capacity); } memcpy(out + length, GRPC_SLICE_START_PTR(slices[i]), GRPC_SLICE_LENGTH(slices[i])); diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 4f3c30dcf6..fc066f9d80 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -128,7 +128,7 @@ static int te_get_fd(grpc_endpoint *ep) { static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - trickle_endpoint *te = arg; + trickle_endpoint *te = (trickle_endpoint *)arg; gpr_mu_lock(&te->mu); te->writing = false; grpc_slice_buffer_reset_and_unref(&te->writing_buffer); @@ -142,7 +142,7 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, double bytes_per_second) { - trickle_endpoint *te = gpr_malloc(sizeof(*te)); + trickle_endpoint *te = (trickle_endpoint *)gpr_malloc(sizeof(*te)); te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index e841a702d4..41090d161a 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -266,6 +266,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { } void TearDown() override { + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); void* ignored_tag; bool ignored_ok; @@ -274,7 +275,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> { ; stub_.reset(); poll_overrider_.reset(); - gpr_tls_set(&g_is_async_end2end_test, 0); grpc_recycle_unused_port(port_); } @@ -396,6 +396,7 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { ResetStub(); SendRpc(1); EXPECT_EQ(0, notify); + gpr_tls_set(&g_is_async_end2end_test, 0); server_->Shutdown(); wait_thread.join(); EXPECT_EQ(1, notify); @@ -404,8 +405,9 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { TEST_P(AsyncEnd2endTest, ShutdownThenWait) { ResetStub(); SendRpc(1); - server_->Shutdown(); + std::thread t([this]() { server_->Shutdown(); }); server_->Wait(); + t.join(); } // Test a simple RPC using the async version of Next diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 54408db600..c236f76e89 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -226,6 +226,31 @@ class ClientLbEnd2endTest : public ::testing::Test { ResetCounters(); } + bool SeenAllServers() { + for (const auto& server : servers_) { + if (server->service_.request_count() == 0) return false; + } + return true; + } + + // Updates \a connection_order by appending to it the index of the newly + // connected server. Must be called after every single RPC. + void UpdateConnectionOrder( + const std::vector<std::unique_ptr<ServerData>>& servers, + std::vector<int>* connection_order) { + for (size_t i = 0; i < servers.size(); ++i) { + if (servers[i]->service_.request_count() == 1) { + // Was the server index known? If not, update connection_order. + const auto it = + std::find(connection_order->begin(), connection_order->end(), i); + if (it == connection_order->end()) { + connection_order->push_back(i); + return; + } + } + } + } + const grpc::string server_host_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -370,13 +395,23 @@ TEST_F(ClientLbEnd2endTest, RoundRobin) { ports.emplace_back(server->port_); } SetNextResolution(ports); - for (size_t i = 0; i < servers_.size(); ++i) { + // Wait until all backends are ready. + do { CheckRpcSendOk(); - } - // One request should have gone to each server. + } while (!SeenAllServers()); + ResetCounters(); + // "Sync" to the end of the list. Next sequence of picks will start at the + // first server (index 0). + WaitForServer(servers_.size() - 1); + std::vector<int> connection_order; for (size_t i = 0; i < servers_.size(); ++i) { - EXPECT_EQ(1, servers_[i]->service_.request_count()); + CheckRpcSendOk(); + UpdateConnectionOrder(servers_, &connection_order); } + // Backends should be iterated over in the order in which the addresses were + // given. + const auto expected = std::vector<int>{0, 1, 2}; + EXPECT_EQ(expected, connection_order); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } @@ -529,13 +564,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { StartServers(kNumServers, ports); ResetStub("round_robin"); SetNextResolution(ports); - // Send one RPC per backend and make sure they are used in order. - // Note: This relies on the fact that the subchannels are reported in - // state READY in the order in which the addresses are specified, - // which is only true because the backends are all local. - for (size_t i = 0; i < servers_.size(); ++i) { + // Send a number of RPCs, which succeed. + for (size_t i = 0; i < 100; ++i) { CheckRpcSendOk(); - EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; } // Kill all servers for (size_t i = 0; i < servers_.size(); ++i) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1f4861a7e6..e54cd03ca2 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -757,6 +757,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { ResetStub(); EchoRequest request; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b5cff664f6..570a3d1067 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,7 +332,8 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + client_load_reporting_interval_seconds), + kRequestMessage_("Live long and prosper.") {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -378,6 +379,10 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + void ResetBackendCounters() { + for (const auto& backend : backends_) backend->ResetCounters(); + } + ClientStats WaitForLoadReports() { ClientStats client_stats; for (const auto& balancer : balancers_) { @@ -386,6 +391,27 @@ class GrpclbEnd2endTest : public ::testing::Test { return client_stats; } + bool SeenAllBackends() { + for (const auto& backend : backends_) { + if (backend->request_count() == 0) return false; + } + return true; + } + + void WaitForAllBackends() { + while (!SeenAllBackends()) { + CheckRpcSendOk(); + } + ResetBackendCounters(); + } + + void WaitForBackend(size_t backend_idx) { + do { + CheckRpcSendOk(); + } while (backends_[backend_idx]->request_count() == 0); + ResetBackendCounters(); + } + struct AddressData { int port; bool is_balancer; @@ -429,20 +455,31 @@ class GrpclbEnd2endTest : public ::testing::Test { balancers_.at(i)->add_response(response, delay_ms); } - std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message, - int num_rpcs, - int timeout_ms = 1000) { - std::vector<std::pair<Status, EchoResponse>> results; + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; EchoRequest request; - EchoResponse response; - request.set_message(message); - for (int i = 0; i < num_rpcs; i++) { - ClientContext context; - context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); - Status status = stub_->Echo(&context, request, &response); - results.push_back(std::make_pair(status, response)); + request.set_message(kRequestMessage_); + ClientContext context; + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk(const size_t times = 1) { + for (size_t i = 0; i < times; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); } - return results; + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); } template <typename T> @@ -499,14 +536,12 @@ class GrpclbEnd2endTest : public ::testing::Test { const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::vector<std::unique_ptr<BackendServiceImpl>> backends_; std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_; - std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; - grpc_fake_resolver_response_generator* response_generator_; + const grpc::string kRequestMessage_; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -521,17 +556,12 @@ TEST_F(SingleBalancerTest, Vanilla) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + + // We need to wait for all backends to come online. + WaitForAllBackends(); + + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { @@ -561,8 +591,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. - const auto& statuses_and_responses = - SendRpc(kMessage_, num_backends_, kCallDeadlineMs); + CheckRpcSendOk(num_backends_); const auto ellapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - t0); @@ -576,13 +605,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -593,70 +615,6 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, RepeatedServerlist) { - constexpr int kServerlistDelayMs = 100; - - // Send a serverlist right away. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - 0); - // ... and the same one a bit later. - ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), - kServerlistDelayMs); - - // Send num_backends/2 requests. - auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2); - // only the first half of the backends will receive them. - for (size_t i = 0; i < backends_.size(); ++i) { - if (i < backends_.size() / 2) - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - else - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()) - << "for backend #" << i; - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - - // Wait for the (duplicated) serverlist update. - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); - - // Verify the LB has sent two responses. - EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); - - // Some more calls to complete the total number of backends. - statuses_and_responses = SendRpc( - kMessage_, - num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */); - // Because a duplicated serverlist should have no effect, all backends must - // have been hit once now. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); - } - EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - balancers_[0]->NotifyDoneWithServerlists(); - // The balancer got a single request. - EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); - // Check LB policy name for the channel. - EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); -} - TEST_F(SingleBalancerTest, BackendsRestart) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -664,21 +622,8 @@ TEST_F(SingleBalancerTest, BackendsRestart) { 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Send 100 RPCs per server. - auto statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - // Each backend should have gotten 100 requests. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(kNumRpcsPerAddress, - backend_servers_[i].service_->request_count()); - } + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); @@ -687,11 +632,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { for (size_t i = 0; i < backends_.size(); ++i) { if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); for (size_t i = 0; i < num_backends_; ++i) { backends_.emplace_back(new BackendServiceImpl()); backend_servers_.emplace_back(ServerThread<BackendService>( @@ -703,11 +644,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { // TODO(dgq): implement the "backend restart" component as well. We need extra // machinery to either update the LB responses "on the fly" or instruct // backends which ports to restart on. - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - } + CheckRpcSendFailure(); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } @@ -727,13 +664,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -758,22 +691,12 @@ TEST_F(UpdatesTest, UpdateBalancers) { // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -804,13 +727,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -837,11 +756,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -860,11 +775,7 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { gpr_time_from_millis(10000, GPR_TIMESPAN)); // Send 10 seconds worth of RPCs do { - statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } + CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. @@ -886,12 +797,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - auto statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); @@ -903,12 +810,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // This is serviced by the existing RR policy gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should again have gone to the first backend. EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); @@ -935,23 +838,13 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // receiving a request. In the meantime, the client continues to be serviced // (by the first backend) without interruption. EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); - do { - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } - } while (backend_servers_[1].service_->request_count() == 0); + WaitForBackend(1); // This is serviced by the existing RR policy backend_servers_[1].service_->ResetCounters(); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); - statuses_and_responses = SendRpc(kMessage_, 10); + CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); - for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); - } // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); @@ -974,14 +867,11 @@ TEST_F(SingleBalancerTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); - + // Send kNumRpcsPerAddress RPCs for each server and drop address. size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; @@ -1010,12 +900,9 @@ TEST_F(SingleBalancerTest, DropAllFirst) { 0, BalancerServiceImpl::BuildResponseForBackends( {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), 0); - const auto& statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - EXPECT_FALSE(status.ok()); - EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); - } + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } TEST_F(SingleBalancerTest, DropAll) { @@ -1028,21 +915,13 @@ TEST_F(SingleBalancerTest, DropAll) { 1000); // First call succeeds. - auto statuses_and_responses = SendRpc(kMessage_, 1); - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } + CheckRpcSendOk(); // But eventually, the update with only dropped servers is processed and calls // fail. + Status status; do { - statuses_and_responses = SendRpc(kMessage_, 1); - ASSERT_EQ(statuses_and_responses.size(), 1UL); - } while (statuses_and_responses[0].first.ok()); - const Status& status = statuses_and_responses[0].first; + status = SendRpc(); + } while (status.ok()); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } @@ -1057,18 +936,8 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Send 100 RPCs per server. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); - - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; - EXPECT_TRUE(status.ok()) << "code=" << status.error_code() - << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); - } - + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, @@ -1096,14 +965,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { 0, BalancerServiceImpl::BuildResponseForBackends( GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); - // Send 100 RPCs for each server and drop address. - const auto& statuses_and_responses = - SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); size_t num_drops = 0; - for (const auto& status_and_response : statuses_and_responses) { - const Status& status = status_and_response.first; - const EchoResponse& response = status_and_response.second; + for (size_t i = 0; i < kNumRpcsPerAddress * (num_backends_ + 3); ++i) { + EchoResponse response; + const Status status = SendRpc(&response); if (!status.ok() && status.error_message() == "Call dropped by load balancing policy") { ++num_drops; diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 936681fec1..070034fe33 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -29,6 +29,7 @@ extern "C" { #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/static_metadata.h" @@ -154,23 +155,59 @@ class Fixture { grpc_transport *t_; }; -static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +class Closure : public grpc_closure { + public: + virtual ~Closure() {} +}; + +template <class F> +std::unique_ptr<Closure> MakeClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public Closure { + C(const F &f, grpc_closure_scheduler *sched) : f_(f) { + GRPC_CLOSURE_INIT(this, Execute, this, sched); + } + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + } + }; + return std::unique_ptr<Closure>(new C(f, sched)); +} + +template <class F> +grpc_closure *MakeOnceClosure( + F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { + struct C : public grpc_closure { + C(const F &f) : f_(f) {} + F f_; + static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + static_cast<C *>(arg)->f_(exec_ctx, error); + delete static_cast<C *>(arg); + } + }; + auto *c = new C{f}; + return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); +} class Stream { public: Stream(Fixture *f) : f_(f) { - GRPC_STREAM_REF_INIT(&refcount_, 1, DoNothing, nullptr, "test_stream"); stream_size_ = grpc_transport_stream_size(f->transport()); stream_ = gpr_malloc(stream_size_); arena_ = gpr_arena_create(4096); } ~Stream() { + gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_free(stream_); gpr_arena_destroy(arena_); } void Init(benchmark::State &state) { + GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, + "test_stream"); + gpr_event_init(&done_); memset(stream_, 0, stream_size_); if ((state.iterations() & 0xffff) == 0) { gpr_arena_destroy(arena_); @@ -181,13 +218,17 @@ class Stream { NULL, arena_); } - void DestroyThen(grpc_closure *closure) { - grpc_transport_destroy_stream(f_->exec_ctx(), f_->transport(), - static_cast<grpc_stream *>(stream_), closure); + void DestroyThen(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { + destroy_closure_ = closure; +#ifndef NDEBUG + grpc_stream_unref(exec_ctx, &refcount_, "DestroyThen"); +#else + grpc_stream_unref(exec_ctx, &refcount_); +#endif } - void Op(grpc_transport_stream_op_batch *op) { - grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(), + void Op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op) { + grpc_transport_perform_stream_op(exec_ctx, f_->transport(), static_cast<grpc_stream *>(stream_), op); } @@ -196,48 +237,24 @@ class Stream { } private: + static void FinishDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + auto stream = static_cast<Stream *>(arg); + grpc_transport_destroy_stream(exec_ctx, stream->f_->transport(), + static_cast<grpc_stream *>(stream->stream_), + stream->destroy_closure_); + gpr_event_set(&stream->done_, (void *)1); + } + Fixture *f_; grpc_stream_refcount refcount_; gpr_arena *arena_; size_t stream_size_; void *stream_; + grpc_closure *destroy_closure_ = nullptr; + gpr_event done_; }; -class Closure : public grpc_closure { - public: - virtual ~Closure() {} -}; - -template <class F> -std::unique_ptr<Closure> MakeClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public Closure { - C(const F &f, grpc_closure_scheduler *sched) : f_(f) { - GRPC_CLOSURE_INIT(this, Execute, this, sched); - } - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - } - }; - return std::unique_ptr<Closure>(new C(f, sched)); -} - -template <class F> -grpc_closure *MakeOnceClosure( - F f, grpc_closure_scheduler *sched = grpc_schedule_on_exec_ctx) { - struct C : public grpc_closure { - C(const F &f) : f_(f) {} - F f_; - static void Execute(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - static_cast<C *>(arg)->f_(exec_ctx, error); - delete static_cast<C *>(arg); - } - }; - auto *c = new C{f}; - return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); -} - //////////////////////////////////////////////////////////////////////////////// // Benchmarks // @@ -246,11 +263,18 @@ static void BM_StreamCreateDestroy(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); Stream s(&f); + grpc_transport_stream_op_batch op; + grpc_transport_stream_op_batch_payload op_payload; + memset(&op, 0, sizeof(op)); + op.cancel_stream = true; + op.payload = &op_payload; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; std::unique_ptr<Closure> next = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; s.Init(state); - s.DestroyThen(next.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, next.get()); }); GRPC_CLOSURE_RUN(f.exec_ctx(), next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -314,14 +338,14 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) { op.on_complete = done.get(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; - s.Op(&op); + s.Op(exec_ctx, &op); }); done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen(start.get()); + s.Op(exec_ctx, &op); + s.DestroyThen(exec_ctx, start.get()); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), start.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); @@ -348,22 +372,28 @@ static void BM_TransportEmptyOp(benchmark::State &state) { if (!state.KeepRunning()) return; reset_op(); op.on_complete = c.get(); - s.Op(&op); + s.Op(exec_ctx, &op); }); GRPC_CLOSURE_SCHED(f.exec_ctx(), c.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + reset_op(); + op.cancel_stream = true; + op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); } BENCHMARK(BM_TransportEmptyOp); +std::vector<std::unique_ptr<gpr_event>> done_events; + static void BM_TransportStreamSend(benchmark::State &state) { TrackCounters track_counters; Fixture f(grpc::ChannelArguments(), true); - Stream s(&f); - s.Init(state); + auto s = std::unique_ptr<Stream>(new Stream(&f)); + s->Init(state); grpc_transport_stream_op_batch op; grpc_transport_stream_op_batch_payload op_payload; memset(&op_payload, 0, sizeof(op_payload)); @@ -390,11 +420,17 @@ static void BM_TransportStreamSend(benchmark::State &state) { grpc_metadata_batch_add_tail(f.exec_ctx(), &b, &storage[i], elems[i]))); } + gpr_event *bm_done = new gpr_event; + gpr_event_init(bm_done); + std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; + if (!state.KeepRunning()) { + gpr_event_set(bm_done, (void *)1); + return; + } // force outgoing window to be yuge - s.chttp2_stream()->flow_control.remote_window_delta = + s->chttp2_stream()->flow_control.remote_window_delta = 1024 * 1024 * 1024; f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); @@ -402,23 +438,27 @@ static void BM_TransportStreamSend(benchmark::State &state) { op.on_complete = c.get(); op.send_message = true; op.payload->send_message.send_message = &send_stream.base; - s.Op(&op); + s->Op(exec_ctx, &op); }); reset_op(); op.send_initial_metadata = true; op.payload->send_initial_metadata.send_initial_metadata = &b; op.on_complete = c.get(); - s.Op(&op); + s->Op(f.exec_ctx(), &op); f.FlushExecCtx(); + gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); + done_events.emplace_back(bm_done); + reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s->Op(f.exec_ctx(), &op); + s->DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); + s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); grpc_slice_buffer_destroy(&send_buffer); @@ -535,7 +575,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.recv_message = true; op.payload->recv_message.recv_message = &recv_stream; op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); + s.Op(exec_ctx, &op); f.PushInput(grpc_slice_ref(incoming_data)); }); @@ -578,7 +618,7 @@ static void BM_TransportStreamRecv(benchmark::State &state) { op.payload->recv_initial_metadata.recv_initial_metadata_ready = do_nothing.get(); op.on_complete = c.get(); - s.Op(&op); + s.Op(f.exec_ctx(), &op); f.PushInput(SLICE_FROM_BUFFER( "\x00\x00\x00\x04\x00\x00\x00\x00\x00" // Generated using: @@ -596,9 +636,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) { reset_op(); op.cancel_stream = true; op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - s.Op(&op); - s.DestroyThen( - MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {})); + s.Op(f.exec_ctx(), &op); + s.DestroyThen(f.exec_ctx(), MakeOnceClosure([](grpc_exec_ctx *exec_ctx, + grpc_error *error) {})); f.FlushExecCtx(); track_counters.Finish(state); grpc_metadata_batch_destroy(f.exec_ctx(), &b); diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 135b4710ce..59fb29dd60 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -105,7 +105,7 @@ class TrickledCHTTP2 : public EndpointPairFixture { (double)state.iterations()); } - void Log(int64_t iteration) { + void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN { auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); grpc_chttp2_transport* client = reinterpret_cast<grpc_chttp2_transport*>(client_transport_); @@ -193,7 +193,8 @@ class TrickledCHTTP2 : public EndpointPairFixture { return p; } - void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) { + void UpdateStats(grpc_chttp2_transport* t, Stats* s, + size_t backlog) GPR_ATTRIBUTE_NO_TSAN { if (backlog == 0) { if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) { s->streams_stalled_due_to_stream_flow_control++; |