diff options
Diffstat (limited to 'test/core/endpoint')
-rw-r--r-- | test/core/endpoint/endpoint_tests.c | 433 | ||||
-rw-r--r-- | test/core/endpoint/endpoint_tests.h | 57 | ||||
-rw-r--r-- | test/core/endpoint/resolve_address_test.c | 135 | ||||
-rw-r--r-- | test/core/endpoint/secure_endpoint_test.c | 222 | ||||
-rw-r--r-- | test/core/endpoint/tcp_client_test.c | 177 | ||||
-rw-r--r-- | test/core/endpoint/tcp_server_test.c | 168 | ||||
-rw-r--r-- | test/core/endpoint/tcp_test.c | 517 |
7 files changed, 1709 insertions, 0 deletions
diff --git a/test/core/endpoint/endpoint_tests.c b/test/core/endpoint/endpoint_tests.c new file mode 100644 index 0000000000..9cc0eaa959 --- /dev/null +++ b/test/core/endpoint/endpoint_tests.c @@ -0,0 +1,433 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/endpoint/endpoint_tests.h" + +#include <sys/types.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/slice.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +/* + General test notes: + + All tests which write data into an endpoint write i%256 into byte i, which + is verified by readers. + + In general there are a few interesting things to vary which may lead to + exercising different codepaths in an implementation: + 1. Total amount of data written to the endpoint + 2. Size of slice allocations + 3. Amount of data we read from or write to the endpoint at once + + The tests here tend to parameterize these where applicable. + +*/ + +ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { + ssize_t num_bytes = 0; + int i; + int j; + unsigned char *buf; + for (i = 0; i < nslices; ++i) { + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + GPR_ASSERT(buf[j] == *current_data); + *current_data = (*current_data + 1) % 256; + } + num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); + } + return num_bytes; +} + +static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config, + const char *test_name, + ssize_t slice_size) { + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + return config.create_fixture(slice_size); +} + +static void end_test(grpc_endpoint_test_config config) { config.clean_up(); } + +static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, + size_t *num_blocks, int *current_data) { + ssize_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0); + gpr_slice *slices = malloc(sizeof(gpr_slice) * nslices); + ssize_t num_bytes_left = num_bytes; + int i; + int j; + unsigned char *buf; + *num_blocks = nslices; + + for (i = 0; i < nslices; ++i) { + slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left + : slice_size); + num_bytes_left -= GPR_SLICE_LENGTH(slices[i]); + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + buf[j] = *current_data; + *current_data = (*current_data + 1) % 256; + } + } + GPR_ASSERT(num_bytes_left == 0); + return slices; +} + +struct read_and_write_test_state { + grpc_endpoint *read_ep; + grpc_endpoint *write_ep; + gpr_mu mu; + gpr_cv cv; + ssize_t target_bytes; + ssize_t bytes_read; + ssize_t current_write_size; + ssize_t bytes_written; + int current_read_data; + int current_write_data; + int read_done; + int write_done; +}; + +static void read_and_write_test_read_handler(void *data, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + struct read_and_write_test_state *state = data; + GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); + if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { + gpr_log(GPR_INFO, "Read handler shutdown"); + gpr_mu_lock(&state->mu); + state->read_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); + return; + } + + state->bytes_read += + count_and_unref_slices(slices, nslices, &state->current_read_data); + if (state->bytes_read == state->target_bytes) { + gpr_log(GPR_INFO, "Read handler done"); + gpr_mu_lock(&state->mu); + state->read_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); + } else { + grpc_endpoint_notify_on_read( + state->read_ep, read_and_write_test_read_handler, data, gpr_inf_future); + } +} + +static void read_and_write_test_write_handler(void *data, + grpc_endpoint_cb_status error) { + struct read_and_write_test_state *state = data; + gpr_slice *slices = NULL; + size_t nslices; + grpc_endpoint_write_status write_status; + + GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); + + if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { + gpr_log(GPR_INFO, "Write handler shutdown"); + gpr_mu_lock(&state->mu); + state->write_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); + return; + } + + for (;;) { + /* Need to do inline writes until they don't succeed synchronously or we + finish writing */ + state->bytes_written += state->current_write_size; + if (state->target_bytes - state->bytes_written < + state->current_write_size) { + state->current_write_size = state->target_bytes - state->bytes_written; + } + if (state->current_write_size == 0) { + break; + } + + slices = allocate_blocks(state->current_write_size, 8192, &nslices, + &state->current_write_data); + write_status = grpc_endpoint_write(state->write_ep, slices, nslices, + read_and_write_test_write_handler, state, + gpr_inf_future); + GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); + free(slices); + if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + return; + } + } + GPR_ASSERT(state->bytes_written == state->target_bytes); + + gpr_log(GPR_INFO, "Write handler done"); + gpr_mu_lock(&state->mu); + state->write_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); +} + +/* Do both reading and writing using the grpc_endpoint API. + + This also includes a test of the shutdown behavior. + */ +static void read_and_write_test(grpc_endpoint_test_config config, + ssize_t num_bytes, ssize_t write_size, + ssize_t slice_size, int shutdown) { + struct read_and_write_test_state state; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + if (shutdown) { + gpr_log(GPR_INFO, "Start read and write shutdown test"); + } else { + gpr_log(GPR_INFO, "Start read and write test with %d bytes, slice size %d", + num_bytes, slice_size); + } + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + + state.read_ep = f.client_ep; + state.write_ep = f.server_ep; + state.target_bytes = num_bytes; + state.bytes_read = 0; + state.current_write_size = write_size; + state.bytes_written = 0; + state.read_done = 0; + state.write_done = 0; + state.current_read_data = 0; + state.current_write_data = 0; + + /* Get started by pretending an initial write completed */ + state.bytes_written -= state.current_write_size; + read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); + + grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, + &state, gpr_inf_future); + + if (shutdown) { + grpc_endpoint_shutdown(state.read_ep); + grpc_endpoint_shutdown(state.write_ep); + } + + gpr_mu_lock(&state.mu); + while (!state.read_done || !state.write_done) { + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + } + gpr_mu_unlock(&state.mu); + + grpc_endpoint_destroy(state.read_ep); + grpc_endpoint_destroy(state.write_ep); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); + end_test(config); +} + +struct timeout_test_state { + gpr_event io_done; +}; + +static void read_timeout_test_read_handler(void *data, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + struct timeout_test_state *state = data; + GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); + gpr_event_set(&state->io_done, (void *)1); +} + +static void read_timeout_test(grpc_endpoint_test_config config, + ssize_t slice_size) { + gpr_timespec timeout = gpr_time_from_micros(10000); + gpr_timespec read_deadline = gpr_time_add(gpr_now(), timeout); + gpr_timespec test_deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); + struct timeout_test_state state; + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + gpr_event_init(&state.io_done); + + grpc_endpoint_notify_on_read(f.client_ep, read_timeout_test_read_handler, + &state, read_deadline); + GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); + grpc_endpoint_destroy(f.client_ep); + grpc_endpoint_destroy(f.server_ep); + end_test(config); +} + +static void write_timeout_test_write_handler(void *data, + grpc_endpoint_cb_status error) { + struct timeout_test_state *state = data; + GPR_ASSERT(error == GRPC_ENDPOINT_CB_TIMED_OUT); + gpr_event_set(&state->io_done, (void *)1); +} + +static void write_timeout_test(grpc_endpoint_test_config config, + ssize_t slice_size) { + gpr_timespec timeout = gpr_time_from_micros(10000); + gpr_timespec write_deadline = gpr_time_add(gpr_now(), timeout); + gpr_timespec test_deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(2000000)); + struct timeout_test_state state; + int current_data = 1; + gpr_slice *slices; + size_t nblocks; + size_t size; + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + gpr_event_init(&state.io_done); + + /* TODO(klempner): Factor this out with the equivalent code in tcp_test.c */ + for (size = 1;; size *= 2) { + slices = allocate_blocks(size, 1, &nblocks, ¤t_data); + switch (grpc_endpoint_write(f.client_ep, slices, nblocks, + write_timeout_test_write_handler, &state, + write_deadline)) { + case GRPC_ENDPOINT_WRITE_DONE: + break; + case GRPC_ENDPOINT_WRITE_ERROR: + gpr_log(GPR_ERROR, "error writing"); + abort(); + case GRPC_ENDPOINT_WRITE_PENDING: + GPR_ASSERT(gpr_event_wait(&state.io_done, test_deadline)); + gpr_free(slices); + goto exit; + } + gpr_free(slices); + } +exit: + grpc_endpoint_destroy(f.client_ep); + grpc_endpoint_destroy(f.server_ep); + end_test(config); +} + +typedef struct { + gpr_event ev; + grpc_endpoint *ep; +} shutdown_during_write_test_state; + +static void shutdown_during_write_test_read_handler( + void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + size_t i; + shutdown_during_write_test_state *st = user_data; + + for (i = 0; i < nslices; i++) { + gpr_slice_unref(slices[i]); + } + + if (error != GRPC_ENDPOINT_CB_OK) { + grpc_endpoint_destroy(st->ep); + gpr_event_set(&st->ev, (void *)(gpr_intptr)error); + } else { + grpc_endpoint_notify_on_read(st->ep, + shutdown_during_write_test_read_handler, + user_data, gpr_inf_future); + } +} + +static void shutdown_during_write_test_write_handler( + void *user_data, grpc_endpoint_cb_status error) { + shutdown_during_write_test_state *st = user_data; + gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d", + error); + grpc_endpoint_destroy(st->ep); + gpr_event_set(&st->ev, (void *)(gpr_intptr)error); +} + +static void shutdown_during_write_test(grpc_endpoint_test_config config, + ssize_t slice_size) { + /* test that shutdown with a pending write creates no leaks */ + gpr_timespec deadline; + size_t size; + size_t nblocks; + int current_data = 1; + shutdown_during_write_test_state read_st; + shutdown_during_write_test_state write_st; + gpr_slice *slices; + grpc_endpoint_test_fixture f = begin_test(config, __FUNCTION__, slice_size); + + gpr_log(GPR_INFO, "testing shutdown during a write"); + + read_st.ep = f.client_ep; + write_st.ep = f.server_ep; + gpr_event_init(&read_st.ev); + gpr_event_init(&write_st.ev); + +#if 0 + read_st.ep = grpc_tcp_create(sv[1], &em); + write_st.ep = grpc_tcp_create(sv[0], &em); +#endif + + grpc_endpoint_notify_on_read(read_st.ep, + shutdown_during_write_test_read_handler, + &read_st, gpr_inf_future); + for (size = 1;; size *= 2) { + slices = allocate_blocks(size, 1, &nblocks, ¤t_data); + switch (grpc_endpoint_write(write_st.ep, slices, nblocks, + shutdown_during_write_test_write_handler, + &write_st, gpr_inf_future)) { + case GRPC_ENDPOINT_WRITE_DONE: + break; + case GRPC_ENDPOINT_WRITE_ERROR: + gpr_log(GPR_ERROR, "error writing"); + abort(); + case GRPC_ENDPOINT_WRITE_PENDING: + grpc_endpoint_shutdown(write_st.ep); + deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(10 * GPR_US_PER_SEC)); + GPR_ASSERT(gpr_event_wait(&write_st.ev, deadline)); + GPR_ASSERT(gpr_event_wait(&read_st.ev, deadline)); + gpr_free(slices); + end_test(config); + return; + } + gpr_free(slices); + } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +void grpc_endpoint_tests(grpc_endpoint_test_config config) { + read_and_write_test(config, 10000000, 100000, 8192, 0); + read_and_write_test(config, 1000000, 100000, 1, 0); + read_and_write_test(config, 100000000, 100000, 1, 1); + read_timeout_test(config, 1000); + write_timeout_test(config, 1000); + shutdown_during_write_test(config, 1000); +} diff --git a/test/core/endpoint/endpoint_tests.h b/test/core/endpoint/endpoint_tests.h new file mode 100644 index 0000000000..79ee759b45 --- /dev/null +++ b/test/core/endpoint/endpoint_tests.h @@ -0,0 +1,57 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_TEST_ENDPOINT_ENDPOINT_TESTS_H__ +#define __GRPC_TEST_ENDPOINT_ENDPOINT_TESTS_H__ + +#include <sys/types.h> + +#include "src/core/endpoint/endpoint.h" + +typedef struct grpc_endpoint_test_config grpc_endpoint_test_config; +typedef struct grpc_endpoint_test_fixture grpc_endpoint_test_fixture; + +struct grpc_endpoint_test_fixture { + grpc_endpoint *client_ep; + grpc_endpoint *server_ep; +}; + +struct grpc_endpoint_test_config { + const char *name; + grpc_endpoint_test_fixture (*create_fixture)(ssize_t slice_size); + void (*clean_up)(); +}; + +void grpc_endpoint_tests(grpc_endpoint_test_config config); + +#endif /* __GRPC_TEST_ENDPOINT_ENDPOINT_TESTS_H__ */ diff --git a/test/core/endpoint/resolve_address_test.c b/test/core/endpoint/resolve_address_test.c new file mode 100644 index 0000000000..1e208d3699 --- /dev/null +++ b/test/core/endpoint/resolve_address_test.c @@ -0,0 +1,135 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/resolve_address.h" +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include "test/core/util/test_config.h" + +static gpr_timespec test_deadline() { + return gpr_time_add(gpr_now(), gpr_time_from_micros(100000000)); +} + +static void must_succeed(void* evp, grpc_resolved_addresses* p) { + GPR_ASSERT(p); + GPR_ASSERT(p->naddrs >= 1); + grpc_resolved_addresses_destroy(p); + gpr_event_set(evp, (void*)1); +} + +static void must_fail(void* evp, grpc_resolved_addresses* p) { + GPR_ASSERT(!p); + gpr_event_set(evp, (void*)1); +} + +static void test_localhost() { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address("localhost:1", NULL, must_succeed, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +static void test_default_port() { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address("localhost", "1", must_succeed, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +static void test_missing_default_port() { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address("localhost", NULL, must_fail, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +static void test_ipv6_with_port() { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address("[2001:db8::1]:1", NULL, must_succeed, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +static void test_ipv6_without_port() { + const char* const kCases[] = { + "2001:db8::1", "2001:db8::1.2.3.4", "[2001:db8::1]", + }; + int i; + for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address(kCases[i], "80", must_succeed, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + } +} + +static void test_invalid_ip_addresses() { + const char* const kCases[] = { + "293.283.1238.3:1", "[2001:db8::11111]:1", + }; + int i; + for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address(kCases[i], NULL, must_fail, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + } +} + +static void test_unparseable_hostports() { + const char* const kCases[] = { + "[", "[::1", "[::1]bad", "[1.2.3.4]", "[localhost]", "[localhost]:1", + }; + int i; + for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { + gpr_event ev; + gpr_event_init(&ev); + grpc_resolve_address(kCases[i], "1", must_fail, &ev); + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + } +} + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + + test_localhost(); + test_default_port(); + test_missing_default_port(); + test_ipv6_with_port(); + test_ipv6_without_port(); + test_invalid_ip_addresses(); + test_unparseable_hostports(); + + return 0; +} diff --git a/test/core/endpoint/secure_endpoint_test.c b/test/core/endpoint/secure_endpoint_test.c new file mode 100644 index 0000000000..9fc2511763 --- /dev/null +++ b/test/core/endpoint/secure_endpoint_test.c @@ -0,0 +1,222 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "endpoint_tests.h" + +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/endpoint/secure_endpoint.h" +#include "src/core/endpoint/tcp.h" +#include "src/core/eventmanager/em.h" +#include "src/core/tsi/fake_transport_security.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "test/core/util/test_config.h" + +grpc_em g_em; + +static void create_sockets(int sv[2]) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( + ssize_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) { + int sv[2]; + tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL); + tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL); + grpc_endpoint_test_fixture f; + grpc_endpoint *tcp_read; + grpc_endpoint *tcp_write; + + create_sockets(sv); + grpc_em_init(&g_em); + tcp_read = grpc_tcp_create_dbg(sv[0], &g_em, slice_size); + tcp_write = grpc_tcp_create(sv[1], &g_em); + + if (leftover_nslices == 0) { + f.client_ep = + grpc_secure_endpoint_create(fake_read_protector, tcp_read, NULL, 0); + } else { + int i; + tsi_result result; + gpr_uint32 still_pending_size; + size_t total_buffer_size = 8192; + size_t buffer_size = total_buffer_size; + gpr_uint8 *encrypted_buffer = gpr_malloc(buffer_size); + gpr_uint8 *cur = encrypted_buffer; + gpr_slice encrypted_leftover; + for (i = 0; i < leftover_nslices; i++) { + gpr_slice plain = leftover_slices[i]; + gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); + size_t message_size = GPR_SLICE_LENGTH(plain); + while (message_size > 0) { + gpr_uint32 protected_buffer_size_to_send = buffer_size; + gpr_uint32 processed_message_size = message_size; + result = tsi_frame_protector_protect( + fake_write_protector, message_bytes, &processed_message_size, cur, + &protected_buffer_size_to_send); + GPR_ASSERT(result == TSI_OK); + message_bytes += processed_message_size; + message_size -= processed_message_size; + cur += protected_buffer_size_to_send; + buffer_size -= protected_buffer_size_to_send; + + GPR_ASSERT(buffer_size >= 0); + } + gpr_slice_unref(plain); + } + do { + gpr_uint32 protected_buffer_size_to_send = buffer_size; + result = tsi_frame_protector_protect_flush(fake_write_protector, cur, + &protected_buffer_size_to_send, + &still_pending_size); + GPR_ASSERT(result == TSI_OK); + cur += protected_buffer_size_to_send; + buffer_size -= protected_buffer_size_to_send; + GPR_ASSERT(buffer_size >= 0); + } while (still_pending_size > 0); + encrypted_leftover = gpr_slice_from_copied_buffer( + (const char *)encrypted_buffer, total_buffer_size - buffer_size); + f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp_read, + &encrypted_leftover, 1); + gpr_slice_unref(encrypted_leftover); + gpr_free(encrypted_buffer); + } + + f.server_ep = + grpc_secure_endpoint_create(fake_write_protector, tcp_write, NULL, 0); + return f; +} + +static grpc_endpoint_test_fixture +secure_endpoint_create_fixture_tcp_socketpair_noleftover(ssize_t slice_size) { + return secure_endpoint_create_fixture_tcp_socketpair(slice_size, NULL, 0); +} + +static grpc_endpoint_test_fixture +secure_endpoint_create_fixture_tcp_socketpair_leftover(ssize_t slice_size) { + gpr_slice s = + gpr_slice_from_copied_string("hello world 12345678900987654321"); + grpc_endpoint_test_fixture f; + + f = secure_endpoint_create_fixture_tcp_socketpair(slice_size, &s, 1); + return f; +} + +static void clean_up() { grpc_em_destroy(&g_em); } + +static grpc_endpoint_test_config configs[] = { + {"secure_ep/tcp_socketpair", + secure_endpoint_create_fixture_tcp_socketpair_noleftover, clean_up}, + {"secure_ep/tcp_socketpair_leftover", + secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, +}; + +static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + gpr_slice s = + gpr_slice_from_copied_string("hello world 12345678900987654321"); + + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 1); + + GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0])); + gpr_slice_unref(slices[0]); + gpr_slice_unref(s); + *(int *)user_data = 1; +} + +static void test_leftover(grpc_endpoint_test_config config, + ssize_t slice_size) { + grpc_endpoint_test_fixture f = config.create_fixture(slice_size); + int verified = 0; + gpr_log(GPR_INFO, "Start test left over"); + + grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified, + gpr_inf_future); + GPR_ASSERT(verified == 1); + + grpc_endpoint_shutdown(f.client_ep); + grpc_endpoint_shutdown(f.server_ep); + grpc_endpoint_destroy(f.client_ep); + grpc_endpoint_destroy(f.server_ep); + clean_up(); +} + +static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + grpc_endpoint_test_fixture *f = user_data; + gpr_slice s = + gpr_slice_from_copied_string("hello world 12345678900987654321"); + + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 1); + + grpc_endpoint_shutdown(f->client_ep); + grpc_endpoint_destroy(f->client_ep); + + GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0])); + gpr_slice_unref(slices[0]); + gpr_slice_unref(s); +} + +/* test which destroys the ep before finishing reading */ +static void test_destroy_ep_early(grpc_endpoint_test_config config, + ssize_t slice_size) { + grpc_endpoint_test_fixture f = config.create_fixture(slice_size); + gpr_log(GPR_INFO, "Start test destroy early"); + + grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f, gpr_inf_future); + + grpc_endpoint_shutdown(f.server_ep); + grpc_endpoint_destroy(f.server_ep); + clean_up(); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + + grpc_endpoint_tests(configs[0]); + test_leftover(configs[1], 1); + test_destroy_ep_early(configs[1], 1); + + return 0; +} diff --git a/test/core/endpoint/tcp_client_test.c b/test/core/endpoint/tcp_client_test.c new file mode 100644 index 0000000000..10138e6af5 --- /dev/null +++ b/test/core/endpoint/tcp_client_test.c @@ -0,0 +1,177 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp_client.h" + +#include <errno.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/eventmanager/em.h" +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +static grpc_em em; + +static gpr_timespec test_deadline() { + return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); +} + +static void must_succeed(void *arg, grpc_endpoint *tcp) { + GPR_ASSERT(tcp); + grpc_endpoint_shutdown(tcp); + grpc_endpoint_destroy(tcp); + gpr_event_set(arg, (void *)1); +} + +static void must_fail(void *arg, grpc_endpoint *tcp) { + GPR_ASSERT(!tcp); + gpr_event_set(arg, (void *)1); +} + +void test_succeeds() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int svr_fd; + int r; + gpr_event ev; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* create a dummy server */ + svr_fd = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(svr_fd >= 0); + GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); + GPR_ASSERT(0 == listen(svr_fd, 1)); + + /* connect to it */ + GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); + grpc_tcp_client_connect(must_succeed, &ev, &em, (struct sockaddr *)&addr, + addr_len, gpr_inf_future); + + /* await the connection */ + do { + addr_len = sizeof(addr); + r = accept(svr_fd, (struct sockaddr *)&addr, &addr_len); + } while (r == -1 && errno == EINTR); + GPR_ASSERT(r >= 0); + close(r); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +void test_fails() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + gpr_event ev; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* connect to a broken address */ + grpc_tcp_client_connect(must_fail, &ev, &em, (struct sockaddr *)&addr, + addr_len, gpr_inf_future); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +void test_times_out() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int svr_fd; +#define NUM_CLIENT_CONNECTS 10 + int client_fd[NUM_CLIENT_CONNECTS]; + int i; + int r; + gpr_event ev; + gpr_timespec connect_deadline; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* create a dummy server */ + svr_fd = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(svr_fd >= 0); + GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); + GPR_ASSERT(0 == listen(svr_fd, 1)); + /* Get its address */ + GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); + + /* tie up the listen buffer, which is somewhat arbitrarily sized. */ + for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { + client_fd[i] = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + do { + r = connect(client_fd[i], (struct sockaddr *)&addr, addr_len); + } while (r == -1 && errno == EINTR); + GPR_ASSERT(r < 0); + GPR_ASSERT(errno == EWOULDBLOCK || errno == EINPROGRESS); + } + + /* connect to dummy server address */ + + connect_deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); + + grpc_tcp_client_connect(must_fail, &ev, &em, (struct sockaddr *)&addr, + addr_len, connect_deadline); + /* Make sure the event doesn't trigger early */ + GPR_ASSERT(!gpr_event_wait( + &ev, gpr_time_add(gpr_now(), gpr_time_from_micros(500000)))); + /* Now wait until it should have triggered */ + sleep(1); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + close(svr_fd); + for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { + close(client_fd[i]); + } +} + +int main(void) { + grpc_em_init(&em); + test_succeeds(); + test_fails(); + test_times_out(); + return 0; +} diff --git a/test/core/endpoint/tcp_server_test.c b/test/core/endpoint/tcp_server_test.c new file mode 100644 index 0000000000..10e2c36df1 --- /dev/null +++ b/test/core/endpoint/tcp_server_test.c @@ -0,0 +1,168 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp_server.h" +#include "src/core/eventmanager/em.h" +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include "test/core/util/test_config.h" + +#include <netinet/in.h> +#include <string.h> +#include <unistd.h> + +#define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__) + +static grpc_em em; + +static gpr_mu mu; +static gpr_cv cv; +static int nconnects = 0; + +static void on_connect(void *arg, grpc_endpoint *tcp) { + grpc_endpoint_shutdown(tcp); + grpc_endpoint_destroy(tcp); + + gpr_mu_lock(&mu); + nconnects++; + gpr_cv_broadcast(&cv); + gpr_mu_unlock(&mu); +} + +static void test_no_op() { + grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server_destroy(s); +} + +static void test_no_op_with_start() { + grpc_tcp_server *s = grpc_tcp_server_create(&em); + LOG_TEST(); + grpc_tcp_server_start(s, on_connect, NULL); + grpc_tcp_server_destroy(s); +} + +static void test_no_op_with_port() { + struct sockaddr_in addr; + grpc_tcp_server *s = grpc_tcp_server_create(&em); + LOG_TEST(); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + GPR_ASSERT( + grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) >= 0); + + grpc_tcp_server_destroy(s); +} + +static void test_no_op_with_port_and_start() { + struct sockaddr_in addr; + grpc_tcp_server *s = grpc_tcp_server_create(&em); + LOG_TEST(); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + GPR_ASSERT( + grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) >= 0); + + grpc_tcp_server_start(s, on_connect, NULL); + + grpc_tcp_server_destroy(s); +} + +static void test_connect(int n) { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int svrfd, clifd; + grpc_tcp_server *s = grpc_tcp_server_create(&em); + int nconnects_before; + gpr_timespec deadline; + int i; + LOG_TEST(); + gpr_log(GPR_INFO, "clients=%d", n); + + gpr_mu_lock(&mu); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + svrfd = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); + GPR_ASSERT(svrfd >= 0); + + GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); + GPR_ASSERT(addr_len == sizeof(addr)); + + grpc_tcp_server_start(s, on_connect, NULL); + + for (i = 0; i < n; i++) { + deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000)); + + nconnects_before = nconnects; + clifd = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(clifd >= 0); + GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, addr_len) == 0); + + while (nconnects == nconnects_before) { + GPR_ASSERT(gpr_cv_wait(&cv, &mu, deadline) == 0); + } + + GPR_ASSERT(nconnects == nconnects_before + 1); + close(clifd); + + if (i != n - 1) { + sleep(1); + } + } + + gpr_mu_unlock(&mu); + + grpc_tcp_server_destroy(s); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_em_init(&em); + gpr_mu_init(&mu); + gpr_cv_init(&cv); + + test_no_op(); + test_no_op_with_start(); + test_no_op_with_port(); + test_no_op_with_port_and_start(); + test_connect(1); + test_connect(10); + + grpc_em_destroy(&em); + gpr_mu_destroy(&mu); + gpr_cv_destroy(&cv); + return 0; +} diff --git a/test/core/endpoint/tcp_test.c b/test/core/endpoint/tcp_test.c new file mode 100644 index 0000000000..7dbc2783e9 --- /dev/null +++ b/test/core/endpoint/tcp_test.c @@ -0,0 +1,517 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/endpoint/tcp.h" + +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <signal.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/eventmanager/em.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/util/test_config.h" +#include "test/core/endpoint/endpoint_tests.h" + +/* + General test notes: + + All tests which write data into a socket write i%256 into byte i, which is + verified by readers. + + In general there are a few interesting things to vary which may lead to + exercising different codepaths in an implementation: + 1. Total amount of data written to the socket + 2. Size of slice allocations + 3. Amount of data we read from or write to the socket at once + + The tests here tend to parameterize these where applicable. + + */ + +grpc_em g_em; + +static void create_sockets(int sv[2]) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +static ssize_t fill_socket(int fd) { + ssize_t write_bytes; + ssize_t total_bytes = 0; + int i; + unsigned char buf[256]; + for (i = 0; i < 256; ++i) { + buf[i] = i; + } + do { + write_bytes = write(fd, buf, 256); + if (write_bytes > 0) { + total_bytes += write_bytes; + } + } while (write_bytes >= 0 || errno == EINTR); + GPR_ASSERT(errno == EAGAIN); + return total_bytes; +} + +static size_t fill_socket_partial(int fd, size_t bytes) { + ssize_t write_bytes; + size_t total_bytes = 0; + unsigned char *buf = malloc(bytes); + int i; + for (i = 0; i < bytes; ++i) { + buf[i] = i % 256; + } + + do { + write_bytes = write(fd, buf, bytes - total_bytes); + if (write_bytes > 0) { + total_bytes += write_bytes; + } + } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes); + + gpr_free(buf); + + return total_bytes; +} + +struct read_socket_state { + grpc_endpoint *ep; + gpr_mu mu; + gpr_cv cv; + size_t read_bytes; + ssize_t target_read_bytes; +}; + +static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, + int *current_data) { + ssize_t num_bytes = 0; + int i; + int j; + unsigned char *buf; + for (i = 0; i < nslices; ++i) { + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + GPR_ASSERT(buf[j] == *current_data); + *current_data = (*current_data + 1) % 256; + } + num_bytes += GPR_SLICE_LENGTH(slices[i]); + gpr_slice_unref(slices[i]); + } + return num_bytes; +} + +static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status error) { + struct read_socket_state *state = (struct read_socket_state *)user_data; + ssize_t read_bytes; + int current_data = 0; + + GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); + + gpr_mu_lock(&state->mu); + read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); + state->read_bytes += read_bytes; + gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, + state->target_read_bytes); + if (state->read_bytes >= state->target_read_bytes) { + gpr_cv_signal(&state->cv); + } else { + grpc_endpoint_notify_on_read(state->ep, read_cb, state, gpr_inf_future); + } + gpr_mu_unlock(&state->mu); +} + +/* Write to a socket, then read from it using the grpc_tcp API. */ +static void read_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct read_socket_state state; + ssize_t written_bytes; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes, + slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + written_bytes = fill_socket_partial(sv[0], num_bytes); + gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.read_bytes = 0; + state.target_read_bytes = written_bytes; + + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + + gpr_mu_lock(&state.mu); + for (;;) { + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + if (state.read_bytes >= state.target_read_bytes) { + break; + } + } + GPR_ASSERT(state.read_bytes == state.target_read_bytes); + gpr_mu_unlock(&state.mu); + + grpc_endpoint_destroy(ep); + + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); +} + +/* Write to a socket until it fills up, then read from it using the grpc_tcp + API. */ +static void large_read_test(ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct read_socket_state state; + ssize_t written_bytes; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + written_bytes = fill_socket(sv[0]); + gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.read_bytes = 0; + state.target_read_bytes = written_bytes; + + grpc_endpoint_notify_on_read(ep, read_cb, &state, gpr_inf_future); + + gpr_mu_lock(&state.mu); + for (;;) { + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + if (state.read_bytes >= state.target_read_bytes) { + break; + } + } + GPR_ASSERT(state.read_bytes == state.target_read_bytes); + gpr_mu_unlock(&state.mu); + + grpc_endpoint_destroy(ep); + + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); +} + +struct write_socket_state { + grpc_endpoint *ep; + gpr_mu mu; + gpr_cv cv; + int write_done; +}; + +static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, + size_t *num_blocks, int *current_data) { + ssize_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0); + gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices); + ssize_t num_bytes_left = num_bytes; + int i; + int j; + unsigned char *buf; + *num_blocks = nslices; + + for (i = 0; i < nslices; ++i) { + slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left + : slice_size); + num_bytes_left -= GPR_SLICE_LENGTH(slices[i]); + buf = GPR_SLICE_START_PTR(slices[i]); + for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + buf[j] = *current_data; + *current_data = (*current_data + 1) % 256; + } + } + GPR_ASSERT(num_bytes_left == 0); + return slices; +} + +static void write_done(void *user_data /* write_socket_state */, + grpc_endpoint_cb_status error) { + struct write_socket_state *state = (struct write_socket_state *)user_data; + gpr_log(GPR_INFO, "Write done callback called"); + gpr_mu_lock(&state->mu); + gpr_log(GPR_INFO, "Signalling write done"); + state->write_done = 1; + gpr_cv_signal(&state->cv); + gpr_mu_unlock(&state->mu); +} + +void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { + unsigned char *buf = malloc(read_size); + ssize_t bytes_read; + size_t bytes_left = num_bytes; + int flags; + int current = 0; + int i; + + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); + + for (;;) { + do { + bytes_read = + read(fd, buf, bytes_left > read_size ? read_size : bytes_left); + } while (bytes_read < 0 && errno == EINTR); + GPR_ASSERT(bytes_read >= 0); + for (i = 0; i < bytes_read; ++i) { + GPR_ASSERT(buf[i] == current); + current = (current + 1) % 256; + } + bytes_left -= bytes_read; + if (bytes_left == 0) break; + } + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); + + gpr_free(buf); +} + +static ssize_t drain_socket(int fd) { + ssize_t read_bytes; + ssize_t total_bytes = 0; + unsigned char buf[256]; + int current = 0; + int i; + do { + read_bytes = read(fd, buf, 256); + if (read_bytes > 0) { + total_bytes += read_bytes; + for (i = 0; i < read_bytes; ++i) { + GPR_ASSERT(buf[i] == current); + current = (current + 1) % 256; + } + } + } while (read_bytes >= 0 || errno == EINTR); + GPR_ASSERT(errno == EAGAIN); + return total_bytes; +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct write_socket_state state; + ssize_t read_bytes; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, + slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create(sv[1], &em); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future) == GRPC_ENDPOINT_WRITE_DONE) { + /* Write completed immediately */ + read_bytes = drain_socket(sv[0]); + GPR_ASSERT(read_bytes == num_bytes); + } else { + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(&state.mu); + for (;;) { + if (state.write_done) { + break; + } + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + } + gpr_mu_unlock(&state.mu); + } + + grpc_endpoint_destroy(ep); + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); + gpr_free(slices); +} + +static void read_done_for_write_error(void *ud, gpr_slice *slices, + size_t nslices, + grpc_endpoint_cb_status error) { + GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(nslices == 0); +} + +/* Write to a socket using the grpc_tcp API, then drain it directly. + Note that if the write does not complete immediately we need to drain the + socket in parallel with the read. */ +static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { + int sv[2]; + grpc_em em; + grpc_endpoint *ep; + struct write_socket_state state; + size_t num_blocks; + gpr_slice *slices; + int current_data = 0; + gpr_timespec rel_deadline = {20, 0}; + gpr_timespec deadline = gpr_time_add(gpr_now(), rel_deadline); + + gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d", + num_bytes, slice_size); + + create_sockets(sv); + grpc_em_init(&em); + + ep = grpc_tcp_create(sv[1], &em); + close(sv[0]); + + gpr_mu_init(&state.mu); + gpr_cv_init(&state.cv); + state.ep = ep; + state.write_done = 0; + + slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); + + switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state, + gpr_inf_future)) { + case GRPC_ENDPOINT_WRITE_DONE: + case GRPC_ENDPOINT_WRITE_ERROR: + /* Write completed immediately */ + break; + case GRPC_ENDPOINT_WRITE_PENDING: + grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL, + gpr_inf_future); + gpr_mu_lock(&state.mu); + for (;;) { + if (state.write_done) { + break; + } + GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + } + gpr_mu_unlock(&state.mu); + break; + } + + grpc_endpoint_destroy(ep); + grpc_em_destroy(&em); + gpr_mu_destroy(&state.mu); + gpr_cv_destroy(&state.cv); + free(slices); +} + +void run_tests() { + int i = 0; + + read_test(100, 8192); + read_test(10000, 8192); + read_test(10000, 137); + read_test(10000, 1); + large_read_test(8192); + large_read_test(1); + + write_test(100, 8192); + write_test(100, 1); + write_test(100000, 8192); + write_test(100000, 1); + write_test(100000, 137); + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_error_test(40320, i); + } + + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + write_test(40320, i); + } +} + +static void clean_up() { grpc_em_destroy(&g_em); } + +static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( + ssize_t slice_size) { + int sv[2]; + grpc_endpoint_test_fixture f; + + create_sockets(sv); + grpc_em_init(&g_em); + f.client_ep = grpc_tcp_create_dbg(sv[0], &g_em, slice_size); + f.server_ep = grpc_tcp_create(sv[1], &g_em); + + return f; +} + +static grpc_endpoint_test_config configs[] = { + {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, +}; + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + /* disable SIGPIPE */ + signal(SIGPIPE, SIG_IGN); + run_tests(); + grpc_endpoint_tests(configs[0]); + + return 0; +} |