diff options
Diffstat (limited to 'test/core/iomgr/endpoint_tests.c')
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 100 |
1 files changed, 80 insertions, 20 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6d15ecf141..b79c22e42a 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -33,6 +33,7 @@ #include "test/core/iomgr/endpoint_tests.h" +#include <stdbool.h> #include <sys/types.h> #include <grpc/support/alloc.h> @@ -128,30 +129,30 @@ struct read_and_write_test_state { }; static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, - void *data, bool success) { + void *data, grpc_error *error) { struct read_and_write_test_state *state = data; state->bytes_read += count_slices( state->incoming.slices, state->incoming.count, &state->current_read_data); - if (state->bytes_read == state->target_bytes || !success) { + if (state->bytes_read == state->target_bytes || error != GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(g_mu); - state->read_done = 1 + success; - grpc_pollset_kick(g_pollset, NULL); + state->read_done = 1 + (error == GRPC_ERROR_NONE); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); - } else if (success) { + } else if (error == GRPC_ERROR_NONE) { grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, &state->done_read); } } static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, - void *data, bool success) { + void *data, grpc_error *error) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - if (success) { + if (error == GRPC_ERROR_NONE) { state->bytes_written += state->current_write_size; if (state->target_bytes - state->bytes_written < state->current_write_size) { @@ -171,8 +172,8 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(g_mu); - state->write_done = 1 + success; - grpc_pollset_kick(g_pollset, NULL); + state->write_done = 1 + (error == GRPC_ERROR_NONE); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } @@ -182,7 +183,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, */ static void read_and_write_test(grpc_endpoint_test_config config, size_t num_bytes, size_t write_size, - size_t slice_size, int shutdown) { + size_t slice_size, bool shutdown) { struct read_and_write_test_state state; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_endpoint_test_fixture f = @@ -221,8 +222,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&exec_ctx, &state, 1); - grpc_exec_ctx_finish(&exec_ctx); + read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, &state.done_read); @@ -233,17 +234,19 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_log(GPR_DEBUG, "shutdown write"); grpc_endpoint_shutdown(&exec_ctx, state.write_ep); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); } gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); end_test(config); gpr_slice_buffer_destroy(&state.outgoing); @@ -253,16 +256,73 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_exec_ctx_finish(&exec_ctx); } +static void inc_on_failure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + *(int *)arg += (error != GRPC_ERROR_NONE); +} + +static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count, + int want_fail_count) { + grpc_exec_ctx_flush(exec_ctx); + for (int i = 0; i < 5 && *fail_count < want_fail_count; i++) { + grpc_pollset_worker *worker = NULL; + gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); + gpr_mu_lock(g_mu); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(exec_ctx, g_pollset, &worker, now, deadline))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_flush(exec_ctx); + } + GPR_ASSERT(*fail_count == want_fail_count); +} + +static void multiple_shutdown_test(grpc_endpoint_test_config config) { + grpc_endpoint_test_fixture f = + begin_test(config, "multiple_shutdown_test", 128); + int fail_count = 0; + + gpr_slice_buffer slice_buffer; + gpr_slice_buffer_init(&slice_buffer); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 0); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + wait_for_fail_count(&exec_ctx, &fail_count, 1); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 2); + gpr_slice_buffer_add(&slice_buffer, gpr_slice_from_copied_string("a")); + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + wait_for_fail_count(&exec_ctx, &fail_count, 3); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + wait_for_fail_count(&exec_ctx, &fail_count, 3); + + gpr_slice_buffer_destroy(&slice_buffer); + + grpc_endpoint_destroy(&exec_ctx, f.client_ep); + grpc_endpoint_destroy(&exec_ctx, f.server_ep); + grpc_exec_ctx_finish(&exec_ctx); +} + void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset, gpr_mu *mu) { size_t i; g_pollset = pollset; g_mu = mu; - read_and_write_test(config, 10000000, 100000, 8192, 0); - read_and_write_test(config, 1000000, 100000, 1, 0); - read_and_write_test(config, 100000000, 100000, 1, 1); + multiple_shutdown_test(config); + read_and_write_test(config, 10000000, 100000, 8192, false); + read_and_write_test(config, 1000000, 100000, 1, false); + read_and_write_test(config, 100000000, 100000, 1, true); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { - read_and_write_test(config, 40320, i, i, 0); + read_and_write_test(config, 40320, i, i, false); } g_pollset = NULL; + g_mu = NULL; } |