diff options
Diffstat (limited to 'test/core/iomgr/endpoint_tests.c')
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 269 |
1 files changed, 85 insertions, 184 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6ef8e9ca3b..853b9a32c2 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -39,6 +39,7 @@ #include <grpc/support/slice.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> #include "test/core/util/test_config.h" /* @@ -59,8 +60,7 @@ static grpc_pollset *g_pollset; -size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, - int *current_data) { +size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { size_t num_bytes = 0; size_t i; size_t j; @@ -72,7 +72,6 @@ size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); - gpr_slice_unref(slices[i]); } return num_bytes; } @@ -87,7 +86,7 @@ static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config, static void end_test(grpc_endpoint_test_config config) { config.clean_up(); } static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, - size_t *num_blocks, int *current_data) { + size_t *num_blocks, gpr_uint8 *current_data) { size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0); gpr_slice *slices = malloc(sizeof(gpr_slice) * nslices); size_t num_bytes_left = num_bytes; @@ -103,7 +102,7 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, buf = GPR_SLICE_START_PTR(slices[i]); for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { buf[j] = *current_data; - *current_data = (*current_data + 1) % 256; + (*current_data)++; } } GPR_ASSERT(num_bytes_left == 0); @@ -118,89 +117,81 @@ struct read_and_write_test_state { size_t current_write_size; size_t bytes_written; int current_read_data; - int current_write_data; + gpr_uint8 current_write_data; int read_done; int write_done; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; + grpc_iomgr_closure done_read; + grpc_iomgr_closure done_write; }; -static void read_and_write_test_read_handler(void *data, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error) { +static void read_and_write_test_read_handler(void *data, int success) { struct read_and_write_test_state *state = data; - GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); - if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { - gpr_log(GPR_INFO, "Read handler shutdown"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->read_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - return; - } - state->bytes_read += - count_and_unref_slices(slices, nslices, &state->current_read_data); - if (state->bytes_read == state->target_bytes) { +loop: + state->bytes_read += count_slices( + state->incoming.slices, state->incoming.count, &state->current_read_data); + if (state->bytes_read == state->target_bytes || !success) { gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->read_done = 1; + state->read_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - } else { - grpc_endpoint_notify_on_read(state->read_ep, - read_and_write_test_read_handler, data); + } else if (success) { + switch (grpc_endpoint_read(state->read_ep, &state->incoming, + &state->done_read)) { + case GRPC_ENDPOINT_ERROR: + success = 0; + goto loop; + case GRPC_ENDPOINT_DONE: + success = 1; + goto loop; + case GRPC_ENDPOINT_PENDING: + break; + } } } -static void read_and_write_test_write_handler(void *data, - grpc_endpoint_cb_status error) { +static void read_and_write_test_write_handler(void *data, int success) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - grpc_endpoint_write_status write_status; - - GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); - - gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler", - error); - - if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { - gpr_log(GPR_INFO, "Write handler shutdown"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->write_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - return; - } - - for (;;) { - /* Need to do inline writes until they don't succeed synchronously or we - finish writing */ - state->bytes_written += state->current_write_size; - if (state->target_bytes - state->bytes_written < - state->current_write_size) { - state->current_write_size = state->target_bytes - state->bytes_written; - } - if (state->current_write_size == 0) { - break; - } - - slices = allocate_blocks(state->current_write_size, 8192, &nslices, - &state->current_write_data); - write_status = - grpc_endpoint_write(state->write_ep, slices, nslices, - read_and_write_test_write_handler, state); - gpr_log(GPR_DEBUG, "write_status=%d", write_status); - GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); - free(slices); - if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - return; + grpc_endpoint_op_status write_status; + + if (success) { + for (;;) { + /* Need to do inline writes until they don't succeed synchronously or we + finish writing */ + state->bytes_written += state->current_write_size; + if (state->target_bytes - state->bytes_written < + state->current_write_size) { + state->current_write_size = state->target_bytes - state->bytes_written; + } + if (state->current_write_size == 0) { + break; + } + + slices = allocate_blocks(state->current_write_size, 8192, &nslices, + &state->current_write_data); + gpr_slice_buffer_reset_and_unref(&state->outgoing); + gpr_slice_buffer_addn(&state->outgoing, slices, nslices); + write_status = grpc_endpoint_write(state->write_ep, &state->outgoing, + &state->done_write); + free(slices); + if (write_status == GRPC_ENDPOINT_PENDING) { + return; + } else if (write_status == GRPC_ENDPOINT_ERROR) { + goto cleanup; + } } + GPR_ASSERT(state->bytes_written == state->target_bytes); } - GPR_ASSERT(state->bytes_written == state->target_bytes); +cleanup: gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->write_done = 1; + state->write_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } @@ -216,6 +207,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); + gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d", + num_bytes, write_size, slice_size, shutdown); if (shutdown) { gpr_log(GPR_INFO, "Start read and write shutdown test"); @@ -234,16 +227,31 @@ static void read_and_write_test(grpc_endpoint_test_config config, state.write_done = 0; state.current_read_data = 0; state.current_write_data = 0; + grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler, + &state); + grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler, + &state); + gpr_slice_buffer_init(&state.outgoing); + gpr_slice_buffer_init(&state.incoming); /* Get started by pretending an initial write completed */ /* NOTE: Sets up initial conditions so we can have the same write handler for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); + read_and_write_test_write_handler(&state, 1); - grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, - &state); + switch ( + grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) { + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_ERROR: + read_and_write_test_read_handler(&state, 0); + break; + case GRPC_ENDPOINT_DONE: + read_and_write_test_read_handler(&state, 1); + break; + } if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); @@ -261,129 +269,22 @@ static void read_and_write_test(grpc_endpoint_test_config config, } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + end_test(config); + gpr_slice_buffer_destroy(&state.outgoing); + gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(state.read_ep); grpc_endpoint_destroy(state.write_ep); - end_test(config); -} - -struct timeout_test_state { - int io_done; -}; - -typedef struct { - int done; - grpc_endpoint *ep; -} shutdown_during_write_test_state; - -static void shutdown_during_write_test_read_handler( - void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - size_t i; - shutdown_during_write_test_state *st = user_data; - - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } - - if (error != GRPC_ENDPOINT_CB_OK) { - grpc_endpoint_destroy(st->ep); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - st->done = error; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - } else { - grpc_endpoint_notify_on_read( - st->ep, shutdown_during_write_test_read_handler, user_data); - } -} - -static void shutdown_during_write_test_write_handler( - void *user_data, grpc_endpoint_cb_status error) { - shutdown_during_write_test_state *st = user_data; - gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d", - error); - if (error == 0) { - /* This happens about 0.5% of the time when run under TSAN, and is entirely - legitimate, but means we aren't testing the path we think we are. */ - /* TODO(klempner): Change this test to retry the write in that case */ - gpr_log(GPR_ERROR, - "shutdown_during_write_test_write_handler completed unexpectedly"); - } - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - st->done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); -} - -static void shutdown_during_write_test(grpc_endpoint_test_config config, - size_t slice_size) { - /* test that shutdown with a pending write creates no leaks */ - gpr_timespec deadline; - size_t size; - size_t nblocks; - int current_data = 1; - shutdown_during_write_test_state read_st; - shutdown_during_write_test_state write_st; - gpr_slice *slices; - grpc_endpoint_test_fixture f = - begin_test(config, "shutdown_during_write_test", slice_size); - - gpr_log(GPR_INFO, "testing shutdown during a write"); - - read_st.ep = f.client_ep; - write_st.ep = f.server_ep; - read_st.done = 0; - write_st.done = 0; - - grpc_endpoint_notify_on_read( - read_st.ep, shutdown_during_write_test_read_handler, &read_st); - for (size = 1;; size *= 2) { - slices = allocate_blocks(size, 1, &nblocks, ¤t_data); - switch (grpc_endpoint_write(write_st.ep, slices, nblocks, - shutdown_during_write_test_write_handler, - &write_st)) { - case GRPC_ENDPOINT_WRITE_DONE: - break; - case GRPC_ENDPOINT_WRITE_ERROR: - gpr_log(GPR_ERROR, "error writing"); - abort(); - case GRPC_ENDPOINT_WRITE_PENDING: - grpc_endpoint_shutdown(write_st.ep); - deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - while (!write_st.done) { - grpc_pollset_worker worker; - GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - grpc_endpoint_destroy(write_st.ep); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - while (!read_st.done) { - grpc_pollset_worker worker; - GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - gpr_free(slices); - end_test(config); - return; - } - gpr_free(slices); - } - - gpr_log(GPR_ERROR, "should never reach here"); - abort(); } void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset) { + size_t i; g_pollset = pollset; read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); - shutdown_during_write_test(config, 1000); + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + read_and_write_test(config, 40320, i, i, 0); + } g_pollset = NULL; } |