diff options
Diffstat (limited to 'test/core/endpoint/tcp_test.c')
-rw-r--r-- | test/core/endpoint/tcp_test.c | 517 |
1 files changed, 517 insertions, 0 deletions
diff --git a/test/core/endpoint/tcp_test.c b/test/core/endpoint/tcp_test.c new file mode 100644 index 0000000000..7dbc2783e9 --- /dev/null +++ b/test/core/endpoint/tcp_test.c @@ -0,0 +1,517 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp.h" + +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <signal.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/eventmanager/em.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/util/test_config.h" +#include "test/core/endpoint/endpoint_tests.h" + +/* + General test notes: + + All tests which write data into a socket write i%256 into byte i, which is + verified by readers. + + In general there are a few interesting things to vary which may lead to + exercising different codepaths in an implementation: + 1. Total amount of data written to the socket + 2. Size of slice allocations + 3. Amount of data we read from or write to the socket at once + + The tests here tend to parameterize these where applicable. + + */ + +grpc_em g_em; + +static void create_sockets(int sv[2]) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +static ssize_t fill_socket(int fd) { + ssize_t write_bytes; + ssize_t total_bytes = 0; + int i; + unsigned char buf[256]; + for (i = 0; i < 256; ++i) { + buf[i] = i; + } + do { + write_bytes = write(fd, buf, 256); + if (write_bytes > 0) { + total_bytes += write_bytes; + } + } while (write_bytes >= 0 || errno == EINTR); + GPR_ASSERT(errno == EAGAIN); + return total_bytes; +} + +static size_t fill_socket_partial(int fd, size_t bytes) { + ssize_t write_bytes; + size_t total_bytes = 0; + unsigned char *buf = malloc(bytes); + int i; + for (i = 0; i < bytes; ++i) { + buf[i] = i % 256; + } + + do { + write_bytes = write(fd, buf, bytes - total_bytes); + if (write_bytes > 0) { + total_bytes += write_bytes; + } + } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes); + + gpr_free(buf); + + return total_bytes; +} + +struct read_socket_state { + grpc_endpoint *ep; + gpr_mu mu; + gpr_cv cv; + size_t read_bytes; + ssize_t target_read_bytes; +}; + +static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { + ssize_t num_bytes = 0; + int i; + int j; + unsigned char *buf; + for (i = 0; i < nslices; ++i) { + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + GPR_ASSERT(buf[j] == *current_data); + *current_data = (*current_data + 1) % 256; + } + num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); + } + return num_bytes; +} + +static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + struct read_socket_state *state = (struct read_socket_state *)user_data; + ssize_t read_bytes; + int current_data = 0; + + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); + + gpr_mu_lock(&state->mu); + read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); + state->read_bytes += read_bytes; + gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, + state->target_read_bytes); + if (state->read_bytes >= state->target_read_bytes) { + gpr_cv_signal(&state->cv); + } else { + grpc_endpoint_notify_on_read(state->ep, read_cb, state, gpr_inf_future); + } + gpr_mu_unlock(&state->mu); +} + +/* Write to a socket, then read from it using the grpc_tcp API. */ +static void read_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct read_socket_state state; + ssize_t written_bytes; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes, + slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + written_bytes = fill_socket_partial(sv[0], num_bytes); + gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.read_bytes = 0; + state.target_read_bytes = written_bytes; + + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + + gpr_mu_lock(&state.mu); + for (;;) { + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + if (state.read_bytes >= state.target_read_bytes) { + break; + } + } + GPR_ASSERT(state.read_bytes == state.target_read_bytes); + gpr_mu_unlock(&state.mu); + + grpc_endpoint_destroy(ep); + + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); +} + +/* Write to a socket until it fills up, then read from it using the grpc_tcp + API. */ +static void large_read_test(ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct read_socket_state state; + ssize_t written_bytes; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + written_bytes = fill_socket(sv[0]); + gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.read_bytes = 0; + state.target_read_bytes = written_bytes; + + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + + gpr_mu_lock(&state.mu); + for (;;) { + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + if (state.read_bytes >= state.target_read_bytes) { + break; + } + } + GPR_ASSERT(state.read_bytes == state.target_read_bytes); + gpr_mu_unlock(&state.mu); + + grpc_endpoint_destroy(ep); + + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); +} + +struct write_socket_state { + grpc_endpoint *ep; + gpr_mu mu; + gpr_cv cv; + int write_done; +}; + +static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, + size_t *num_blocks, int *current_data) { + ssize_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0); + gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices); + ssize_t num_bytes_left = num_bytes; + int i; + int j; + unsigned char *buf; + *num_blocks = nslices; + + for (i = 0; i < nslices; ++i) { + slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left + : slice_size); + num_bytes_left -= GPR_SLICE_LENGTH(slices[i]); + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + buf[j] = *current_data; + *current_data = (*current_data + 1) % 256; + } + } + GPR_ASSERT(num_bytes_left == 0); + return slices; +} + +static void write_done(void *user_data /* write_socket_state */, + grpc_endpoint_cb_status error) { + struct write_socket_state *state = (struct write_socket_state *)user_data; + gpr_log(GPR_INFO, "Write done callback called"); + gpr_mu_lock(&state->mu); + gpr_log(GPR_INFO, "Signalling write done"); + state->write_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); +} + +void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { + unsigned char *buf = malloc(read_size); + ssize_t bytes_read; + size_t bytes_left = num_bytes; + int flags; + int current = 0; + int i; + + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); + + for (;;) { + do { + bytes_read = + read(fd, buf, bytes_left > read_size ? read_size : bytes_left); + } while (bytes_read < 0 && errno == EINTR); + GPR_ASSERT(bytes_read >= 0); + for (i = 0; i < bytes_read; ++i) { + GPR_ASSERT(buf[i] == current); + current = (current + 1) % 256; + } + bytes_left -= bytes_read; + if (bytes_left == 0) break; + } + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); + + gpr_free(buf); +} + +static ssize_t drain_socket(int fd) { + ssize_t read_bytes; + ssize_t total_bytes = 0; + unsigned char buf[256]; + int current = 0; + int i; + do { + read_bytes = read(fd, buf, 256); + if (read_bytes > 0) { + total_bytes += read_bytes; + for (i = 0; i < read_bytes; ++i) { + GPR_ASSERT(buf[i] == current); + current = (current + 1) % 256; + } + } + } while (read_bytes >= 0 || errno == EINTR); + GPR_ASSERT(errno == EAGAIN); + return total_bytes; +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct write_socket_state state; + ssize_t read_bytes; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, + slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create(sv[1], &em); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future) == GRPC_ENDPOINT_WRITE_DONE) { + /* Write completed immediately */ + read_bytes = drain_socket(sv[0]); + GPR_ASSERT(read_bytes == num_bytes); + } else { + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(&state.mu); + for (;;) { + if (state.write_done) { + break; + } + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + } + gpr_mu_unlock(&state.mu); + } + + grpc_endpoint_destroy(ep); + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); + gpr_free(slices); +} + +static void read_done_for_write_error(void *ud, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 0); +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct write_socket_state state; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d", + num_bytes, slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create(sv[1], &em); + close(sv[0]); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future)) { + case GRPC_ENDPOINT_WRITE_DONE: + case GRPC_ENDPOINT_WRITE_ERROR: + /* Write completed immediately */ + break; + case GRPC_ENDPOINT_WRITE_PENDING: + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL, + gpr_inf_future); + gpr_mu_lock(&state.mu); + for (;;) { + if (state.write_done) { + break; + } + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + } + gpr_mu_unlock(&state.mu); + break; + } + + grpc_endpoint_destroy(ep); + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); + free(slices); +} + +void run_tests() { + int i = 0; + + read_test(100, 8192); + read_test(10000, 8192); + read_test(10000, 137); + read_test(10000, 1); + large_read_test(8192); + large_read_test(1); + + write_test(100, 8192); + write_test(100, 1); + write_test(100000, 8192); + write_test(100000, 1); + write_test(100000, 137); + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_error_test(40320, i); + } + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_test(40320, i); + } +} + +static void clean_up() { grpc_em_destroy(&g_em); } + +static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( + ssize_t slice_size) { + int sv[2]; + grpc_endpoint_test_fixture f; + + create_sockets(sv); + grpc_em_init(&g_em); + f.client_ep = grpc_tcp_create_dbg(sv[0], &g_em, slice_size); + f.server_ep = grpc_tcp_create(sv[1], &g_em); + + return f; +} + +static grpc_endpoint_test_config configs[] = { + {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, +}; + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + /* disable SIGPIPE */ + signal(SIGPIPE, SIG_IGN); + run_tests(); + grpc_endpoint_tests(configs[0]); + + return 0; +} |