diff options
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 107 |
1 files changed, 55 insertions, 52 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index d290c6bc3a..4351642ab6 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -36,8 +36,8 @@ #include <errno.h> #include <fcntl.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/grpc.h> @@ -45,10 +45,11 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include "test/core/util/test_config.h" #include "test/core/iomgr/endpoint_tests.h" +#include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; /* General test notes: @@ -145,7 +146,7 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { GPR_ASSERT(success); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); current_data = state->read_bytes % 256; read_bytes = count_slices(state->incoming.slices, state->incoming.count, ¤t_data); @@ -153,10 +154,10 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } else { grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } } @@ -175,7 +176,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -188,17 +189,17 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -221,7 +222,7 @@ static void large_read_test(size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -234,17 +235,17 @@ static void large_read_test(size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -283,11 +284,11 @@ static void write_done(grpc_exec_ctx *exec_ctx, void *user_data /* write_socket_state */, bool success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { @@ -303,12 +304,12 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); for (;;) { - grpc_pollset_worker worker; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_worker *worker = NULL; + gpr_mu_lock(g_mu); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = @@ -350,7 +351,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; state.write_done = 0; @@ -363,19 +364,19 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); for (;;) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; if (state.write_done) { break; } - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(&exec_ctx, ep); @@ -386,7 +387,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, bool success) { int *done = arg; *done = 1; - grpc_pollset_kick(&g_pollset, NULL); + grpc_pollset_kick(g_pollset, NULL); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -410,7 +411,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -423,27 +424,27 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!fd_released_done) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); GPR_ASSERT(fd == sv[1]); grpc_exec_ctx_finish(&exec_ctx); @@ -491,8 +492,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( slice_size, "test"); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); grpc_exec_ctx_finish(&exec_ctx); @@ -512,13 +513,15 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); run_tests(); - grpc_endpoint_tests(configs[0], &g_pollset); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); return 0; } |