diff options
Diffstat (limited to 'test/core/iomgr')
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 204 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 148 |
2 files changed, 174 insertions, 178 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index ef673747a1..6ef8e9ca3b 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -59,7 +59,8 @@ static grpc_pollset *g_pollset; -size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { +size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { size_t num_bytes = 0; size_t i; size_t j; @@ -71,6 +72,7 @@ size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); } return num_bytes; } @@ -119,76 +121,86 @@ struct read_and_write_test_state { int current_write_data; int read_done; int write_done; - gpr_slice_buffer incoming; - gpr_slice_buffer outgoing; - grpc_iomgr_closure done_read; - grpc_iomgr_closure done_write; }; -static void read_and_write_test_read_handler(void *data, int success) { +static void read_and_write_test_read_handler(void *data, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { struct read_and_write_test_state *state = data; + GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); + if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { + gpr_log(GPR_INFO, "Read handler shutdown"); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + state->read_done = 1; + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + return; + } - state->bytes_read += count_slices( - state->incoming.slices, state->incoming.count, &state->current_read_data); - if (state->bytes_read == state->target_bytes || !success) { + state->bytes_read += + count_and_unref_slices(slices, nslices, &state->current_read_data); + if (state->bytes_read == state->target_bytes) { gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->read_done = 1 + success; + state->read_done = 1; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - } else if (success) { - switch (grpc_endpoint_read(state->read_ep, &state->incoming, - &state->done_read)) { - case GRPC_ENDPOINT_ERROR: - read_and_write_test_read_handler(data, 0); - break; - case GRPC_ENDPOINT_DONE: - read_and_write_test_read_handler(data, 1); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + } else { + grpc_endpoint_notify_on_read(state->read_ep, + read_and_write_test_read_handler, data); } } -static void read_and_write_test_write_handler(void *data, int success) { +static void read_and_write_test_write_handler(void *data, + grpc_endpoint_cb_status error) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - grpc_endpoint_op_status write_status; - - if (success) { - for (;;) { - /* Need to do inline writes until they don't succeed synchronously or we - finish writing */ - state->bytes_written += state->current_write_size; - if (state->target_bytes - state->bytes_written < - state->current_write_size) { - state->current_write_size = state->target_bytes - state->bytes_written; - } - if (state->current_write_size == 0) { - break; - } - - slices = allocate_blocks(state->current_write_size, 8192, &nslices, - &state->current_write_data); - gpr_slice_buffer_reset_and_unref(&state->outgoing); - gpr_slice_buffer_addn(&state->outgoing, slices, nslices); - write_status = grpc_endpoint_write(state->write_ep, &state->outgoing, - &state->done_write); - gpr_log(GPR_DEBUG, "write_status=%d", write_status); - GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR); - free(slices); - if (write_status == GRPC_ENDPOINT_PENDING) { - return; - } + grpc_endpoint_write_status write_status; + + GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); + + gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler", + error); + + if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { + gpr_log(GPR_INFO, "Write handler shutdown"); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + state->write_done = 1; + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + return; + } + + for (;;) { + /* Need to do inline writes until they don't succeed synchronously or we + finish writing */ + state->bytes_written += state->current_write_size; + if (state->target_bytes - state->bytes_written < + state->current_write_size) { + state->current_write_size = state->target_bytes - state->bytes_written; + } + if (state->current_write_size == 0) { + break; + } + + slices = allocate_blocks(state->current_write_size, 8192, &nslices, + &state->current_write_data); + write_status = + grpc_endpoint_write(state->write_ep, slices, nslices, + read_and_write_test_write_handler, state); + gpr_log(GPR_DEBUG, "write_status=%d", write_status); + GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); + free(slices); + if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + return; } - GPR_ASSERT(state->bytes_written == state->target_bytes); } + GPR_ASSERT(state->bytes_written == state->target_bytes); gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->write_done = 1 + success; + state->write_done = 1; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } @@ -222,31 +234,16 @@ static void read_and_write_test(grpc_endpoint_test_config config, state.write_done = 0; state.current_read_data = 0; state.current_write_data = 0; - grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler, - &state); - grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler, - &state); - gpr_slice_buffer_init(&state.outgoing); - gpr_slice_buffer_init(&state.incoming); /* Get started by pretending an initial write completed */ /* NOTE: Sets up initial conditions so we can have the same write handler for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, 1); + read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); - switch ( - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) { - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - read_and_write_test_read_handler(&state, 0); - break; - case GRPC_ENDPOINT_DONE: - read_and_write_test_read_handler(&state, 1); - break; - } + grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, + &state); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); @@ -266,8 +263,6 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_endpoint_destroy(state.read_ep); grpc_endpoint_destroy(state.write_ep); - gpr_slice_buffer_destroy(&state.outgoing); - gpr_slice_buffer_destroy(&state.incoming); end_test(config); } @@ -278,40 +273,36 @@ struct timeout_test_state { typedef struct { int done; grpc_endpoint *ep; - gpr_slice_buffer incoming; - grpc_iomgr_closure done_read; } shutdown_during_write_test_state; -static void shutdown_during_write_test_read_handler(void *user_data, - int success) { +static void shutdown_during_write_test_read_handler( + void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + size_t i; shutdown_during_write_test_state *st = user_data; - if (!success) { + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (error != GRPC_ENDPOINT_CB_OK) { grpc_endpoint_destroy(st->ep); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - st->done = 1; + st->done = error; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else { - switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) { - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - shutdown_during_write_test_read_handler(user_data, 0); - break; - case GRPC_ENDPOINT_DONE: - shutdown_during_write_test_read_handler(user_data, 1); - break; - } + grpc_endpoint_notify_on_read( + st->ep, shutdown_during_write_test_read_handler, user_data); } } -static void shutdown_during_write_test_write_handler(void *user_data, - int success) { +static void shutdown_during_write_test_write_handler( + void *user_data, grpc_endpoint_cb_status error) { shutdown_during_write_test_state *st = user_data; - gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d", - success); - if (success) { + gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d", + error); + if (error == 0) { /* This happens about 0.5% of the time when run under TSAN, and is entirely legitimate, but means we aren't testing the path we think we are. */ /* TODO(klempner): Change this test to retry the write in that case */ @@ -334,8 +325,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, shutdown_during_write_test_state read_st; shutdown_during_write_test_state write_st; gpr_slice *slices; - gpr_slice_buffer outgoing; - grpc_iomgr_closure done_write; grpc_endpoint_test_fixture f = begin_test(config, "shutdown_during_write_test", slice_size); @@ -346,26 +335,19 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, read_st.done = 0; write_st.done = 0; - grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler, - &write_st); - grpc_iomgr_closure_init(&read_st.done_read, - shutdown_during_write_test_read_handler, &read_st); - gpr_slice_buffer_init(&read_st.incoming); - gpr_slice_buffer_init(&outgoing); - - GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming, - &read_st.done_read) == GRPC_ENDPOINT_PENDING); + grpc_endpoint_notify_on_read( + read_st.ep, shutdown_during_write_test_read_handler, &read_st); for (size = 1;; size *= 2) { slices = allocate_blocks(size, 1, &nblocks, ¤t_data); - gpr_slice_buffer_reset_and_unref(&outgoing); - gpr_slice_buffer_addn(&outgoing, slices, nblocks); - switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) { - case GRPC_ENDPOINT_DONE: + switch (grpc_endpoint_write(write_st.ep, slices, nblocks, + shutdown_during_write_test_write_handler, + &write_st)) { + case GRPC_ENDPOINT_WRITE_DONE: break; - case GRPC_ENDPOINT_ERROR: + case GRPC_ENDPOINT_WRITE_ERROR: gpr_log(GPR_ERROR, "error writing"); abort(); - case GRPC_ENDPOINT_PENDING: + case GRPC_ENDPOINT_WRITE_PENDING: grpc_endpoint_shutdown(write_st.ep); deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); @@ -386,8 +368,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_free(slices); - gpr_slice_buffer_destroy(&read_st.incoming); - gpr_slice_buffer_destroy(&outgoing); end_test(config); return; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 8acaa433bb..6ad832231f 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -118,12 +118,10 @@ struct read_socket_state { grpc_endpoint *ep; ssize_t read_bytes; ssize_t target_read_bytes; - gpr_slice_buffer incoming; - grpc_iomgr_closure read_cb; }; -static ssize_t count_slices(gpr_slice *slices, size_t nslices, - int *current_data) { +static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { ssize_t num_bytes = 0; unsigned i, j; unsigned char *buf; @@ -134,41 +132,31 @@ static ssize_t count_slices(gpr_slice *slices, size_t nslices, *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); } return num_bytes; } -static void read_cb(void *user_data, int success) { +static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { struct read_socket_state *state = (struct read_socket_state *)user_data; ssize_t read_bytes; int current_data; - GPR_ASSERT(success); + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); current_data = state->read_bytes % 256; - read_bytes = count_slices(state->incoming.slices, state->incoming.count, - ¤t_data); + read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); state->read_bytes += read_bytes; gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + /* empty */ } else { - switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) { - case GRPC_ENDPOINT_DONE: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 1); - break; - case GRPC_ENDPOINT_ERROR: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 0); - break; - case GRPC_ENDPOINT_PENDING: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - break; - } + grpc_endpoint_notify_on_read(state->ep, read_cb, state); } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Write to a socket, then read from it using the grpc_tcp API. */ @@ -193,19 +181,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -216,7 +193,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -243,19 +219,8 @@ static void large_read_test(ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -266,7 +231,6 @@ static void large_read_test(ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -298,7 +262,8 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, return slices; } -static void write_done(void *user_data /* write_socket_state */, int success) { +static void write_done(void *user_data /* write_socket_state */, + grpc_endpoint_cb_status error) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -374,8 +339,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { size_t num_blocks; gpr_slice *slices; int current_data = 0; - gpr_slice_buffer outgoing; - grpc_iomgr_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, @@ -392,21 +355,74 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - gpr_slice_buffer_init(&outgoing); - gpr_slice_buffer_addn(&outgoing, slices, num_blocks); - grpc_iomgr_closure_init(&write_done_closure, write_done, &state); + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == + GRPC_ENDPOINT_WRITE_DONE) { + /* Write completed immediately */ + read_bytes = drain_socket(sv[0]); + GPR_ASSERT(read_bytes == num_bytes); + } else { + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + for (;;) { + grpc_pollset_worker worker; + if (state.write_done) { + break; + } + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + } + + grpc_endpoint_destroy(ep); + gpr_free(slices); +} + +static void read_done_for_write_error(void *ud, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 0); +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_endpoint *ep; + struct write_socket_state state; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + grpc_pollset_worker worker; + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + + gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d", + num_bytes, slice_size); + + create_sockets(sv); - switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) { - case GRPC_ENDPOINT_DONE: + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); + grpc_endpoint_add_to_pollset(ep, &g_pollset); + + close(sv[0]); + + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) { + case GRPC_ENDPOINT_WRITE_DONE: + case GRPC_ENDPOINT_WRITE_ERROR: /* Write completed immediately */ - read_bytes = drain_socket(sv[0]); - GPR_ASSERT(read_bytes == num_bytes); break; - case GRPC_ENDPOINT_PENDING: - drain_socket_blocking(sv[0], num_bytes, num_bytes); + case GRPC_ENDPOINT_WRITE_PENDING: + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { - grpc_pollset_worker worker; if (state.write_done) { break; } @@ -415,14 +431,10 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); break; - case GRPC_ENDPOINT_ERROR: - gpr_log(GPR_ERROR, "endpoint got error"); - abort(); } - gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(ep); - gpr_free(slices); + free(slices); } void run_tests(void) { @@ -442,6 +454,10 @@ void run_tests(void) { write_test(100000, 137); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_error_test(40320, i); + } + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { write_test(40320, i); } } |