diff options
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 148 |
1 files changed, 82 insertions, 66 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 8acaa433bb..6ad832231f 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -118,12 +118,10 @@ struct read_socket_state { grpc_endpoint *ep; ssize_t read_bytes; ssize_t target_read_bytes; - gpr_slice_buffer incoming; - grpc_iomgr_closure read_cb; }; -static ssize_t count_slices(gpr_slice *slices, size_t nslices, - int *current_data) { +static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { ssize_t num_bytes = 0; unsigned i, j; unsigned char *buf; @@ -134,41 +132,31 @@ static ssize_t count_slices(gpr_slice *slices, size_t nslices, *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); } return num_bytes; } -static void read_cb(void *user_data, int success) { +static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { struct read_socket_state *state = (struct read_socket_state *)user_data; ssize_t read_bytes; int current_data; - GPR_ASSERT(success); + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); current_data = state->read_bytes % 256; - read_bytes = count_slices(state->incoming.slices, state->incoming.count, - ¤t_data); + read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); state->read_bytes += read_bytes; gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + /* empty */ } else { - switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) { - case GRPC_ENDPOINT_DONE: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 1); - break; - case GRPC_ENDPOINT_ERROR: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 0); - break; - case GRPC_ENDPOINT_PENDING: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - break; - } + grpc_endpoint_notify_on_read(state->ep, read_cb, state); } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Write to a socket, then read from it using the grpc_tcp API. */ @@ -193,19 +181,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -216,7 +193,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -243,19 +219,8 @@ static void large_read_test(ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_notify_on_read(ep, read_cb, &state); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -266,7 +231,6 @@ static void large_read_test(ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -298,7 +262,8 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, return slices; } -static void write_done(void *user_data /* write_socket_state */, int success) { +static void write_done(void *user_data /* write_socket_state */, + grpc_endpoint_cb_status error) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -374,8 +339,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { size_t num_blocks; gpr_slice *slices; int current_data = 0; - gpr_slice_buffer outgoing; - grpc_iomgr_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, @@ -392,21 +355,74 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - gpr_slice_buffer_init(&outgoing); - gpr_slice_buffer_addn(&outgoing, slices, num_blocks); - grpc_iomgr_closure_init(&write_done_closure, write_done, &state); + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == + GRPC_ENDPOINT_WRITE_DONE) { + /* Write completed immediately */ + read_bytes = drain_socket(sv[0]); + GPR_ASSERT(read_bytes == num_bytes); + } else { + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + for (;;) { + grpc_pollset_worker worker; + if (state.write_done) { + break; + } + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + } + + grpc_endpoint_destroy(ep); + gpr_free(slices); +} + +static void read_done_for_write_error(void *ud, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 0); +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_endpoint *ep; + struct write_socket_state state; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + grpc_pollset_worker worker; + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + + gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d", + num_bytes, slice_size); + + create_sockets(sv); - switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) { - case GRPC_ENDPOINT_DONE: + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); + grpc_endpoint_add_to_pollset(ep, &g_pollset); + + close(sv[0]); + + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) { + case GRPC_ENDPOINT_WRITE_DONE: + case GRPC_ENDPOINT_WRITE_ERROR: /* Write completed immediately */ - read_bytes = drain_socket(sv[0]); - GPR_ASSERT(read_bytes == num_bytes); break; - case GRPC_ENDPOINT_PENDING: - drain_socket_blocking(sv[0], num_bytes, num_bytes); + case GRPC_ENDPOINT_WRITE_PENDING: + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { - grpc_pollset_worker worker; if (state.write_done) { break; } @@ -415,14 +431,10 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); break; - case GRPC_ENDPOINT_ERROR: - gpr_log(GPR_ERROR, "endpoint got error"); - abort(); } - gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(ep); - gpr_free(slices); + free(slices); } void run_tests(void) { @@ -442,6 +454,10 @@ void run_tests(void) { write_test(100000, 137); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_error_test(40320, i); + } + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { write_test(40320, i); } } |