diff options
author | murgatroid99 <mlumish@google.com> | 2017-02-23 14:59:45 -0800 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2017-02-23 14:59:45 -0800 |
commit | 98cdf3b3e61c2e5db5b7b945e471f2c898faab33 (patch) | |
tree | 48bbffb475c61df53a97bbf96e672b80938af1ba /test/core | |
parent | 63852c933b4e1ecc9da364fb85baa91d637ec68e (diff) | |
parent | 5eb24eeaed12f96e49171a77939e23ff2af8680b (diff) |
Merge remote-tracking branch 'upstream/v1.1.x' into merge_1.1.x_master
Diffstat (limited to 'test/core')
-rw-r--r-- | test/core/end2end/goaway_server_test.c | 6 | ||||
-rw-r--r-- | test/core/handshake/client_ssl.c | 11 | ||||
-rw-r--r-- | test/core/iomgr/resolve_address_test.c | 53 | ||||
-rw-r--r-- | test/core/iomgr/tcp_client_posix_test.c | 11 | ||||
-rw-r--r-- | test/core/iomgr/tcp_client_uv_test.c | 222 | ||||
-rw-r--r-- | test/core/iomgr/tcp_server_uv_test.c | 339 | ||||
-rw-r--r-- | test/core/iomgr/timer_list_test.c | 11 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 223 | ||||
-rw-r--r-- | test/core/surface/completion_queue_threading_test.c | 290 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.c | 6 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.c | 6 | ||||
-rw-r--r-- | test/core/util/test_config.c | 16 |
12 files changed, 954 insertions, 240 deletions
diff --git a/test/core/end2end/goaway_server_test.c b/test/core/end2end/goaway_server_test.c index c67c1f145a..a9634bfbae 100644 --- a/test/core/end2end/goaway_server_test.c +++ b/test/core/end2end/goaway_server_test.c @@ -31,6 +31,12 @@ * */ +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/test/core/handshake/client_ssl.c b/test/core/handshake/client_ssl.c index 1a06fd6255..5cfe60de4b 100644 --- a/test/core/handshake/client_ssl.c +++ b/test/core/handshake/client_ssl.c @@ -31,6 +31,11 @@ * */ +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET + #include <arpa/inet.h> #include <openssl/err.h> #include <openssl/ssl.h> @@ -324,3 +329,9 @@ int main(int argc, char *argv[]) { GPR_ASSERT(!client_ssl_test("foo")); return 0; } + +#else /* GRPC_POSIX_SOCKET */ + +int main(int argc, char **argv) { return 1; } + +#endif /* GRPC_POSIX_SOCKET */ diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 6a9bb5ae6f..3b79ef4e65 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -35,7 +35,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include <grpc/support/thd.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -63,6 +62,7 @@ void args_init(grpc_exec_ctx *exec_ctx, args_struct *args) { args->pollset_set = grpc_pollset_set_create(); grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset); args->addrs = NULL; + gpr_atm_rel_store(&args->done_atm, 0); } void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) { @@ -85,8 +85,7 @@ static gpr_timespec n_sec_deadline(int seconds) { gpr_time_from_seconds(seconds, GPR_TIMESPAN)); } -static void actually_poll(void *argsp) { - args_struct *args = argsp; +static void poll_pollset_until_request_done(args_struct *args) { gpr_timespec deadline = n_sec_deadline(10); while (true) { bool done = gpr_atm_acq_load(&args->done_atm) != 0; @@ -111,12 +110,6 @@ static void actually_poll(void *argsp) { gpr_event_set(&args->ev, (void *)1); } -static void poll_pollset_until_request_done(args_struct *args) { - gpr_atm_rel_store(&args->done_atm, 0); - gpr_thd_id id; - gpr_thd_new(&id, actually_poll, args, NULL); -} - static void must_succeed(grpc_exec_ctx *exec_ctx, void *argsp, grpc_error *err) { args_struct *args = argsp; @@ -124,23 +117,30 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *argsp, GPR_ASSERT(args->addrs != NULL); GPR_ASSERT(args->addrs->naddrs > 0); gpr_atm_rel_store(&args->done_atm, 1); + gpr_mu_lock(args->mu); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); + gpr_mu_unlock(args->mu); } static void must_fail(grpc_exec_ctx *exec_ctx, void *argsp, grpc_error *err) { args_struct *args = argsp; GPR_ASSERT(err != GRPC_ERROR_NONE); gpr_atm_rel_store(&args->done_atm, 1); + gpr_mu_lock(args->mu); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); + gpr_mu_unlock(args->mu); } static void test_localhost(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, "localhost:1", NULL, args.pollset_set, grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -149,24 +149,40 @@ static void test_default_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, "localhost", "1", args.pollset_set, grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } -static void test_missing_default_port(void) { +static void test_non_numeric_default_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); + grpc_resolve_address( + &exec_ctx, "localhost", "https", args.pollset_set, + grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx), + &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_missing_default_port(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + args_struct args; + args_init(&exec_ctx, &args); grpc_resolve_address( &exec_ctx, "localhost", NULL, args.pollset_set, grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -175,11 +191,12 @@ static void test_ipv6_with_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, "[2001:db8::1]:1", NULL, args.pollset_set, grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -193,11 +210,12 @@ static void test_ipv6_without_port(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, kCases[i], "80", args.pollset_set, grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -212,11 +230,12 @@ static void test_invalid_ip_addresses(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, kCases[i], NULL, args.pollset_set, grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -231,11 +250,12 @@ static void test_unparseable_hostports(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; args_init(&exec_ctx, &args); - poll_pollset_until_request_done(&args); grpc_resolve_address( &exec_ctx, kCases[i], "1", args.pollset_set, grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); + grpc_exec_ctx_flush(&exec_ctx); + poll_pollset_until_request_done(&args); args_finish(&exec_ctx, &args); grpc_exec_ctx_finish(&exec_ctx); } @@ -247,6 +267,7 @@ int main(int argc, char **argv) { grpc_iomgr_init(); test_localhost(); test_default_port(); + test_non_numeric_default_port(); test_missing_default_port(); test_ipv6_with_port(); test_ipv6_without_port(); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index c9b514a024..c767f63f0c 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -31,6 +31,11 @@ * */ +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET + #include "src/core/lib/iomgr/tcp_client.h" #include <errno.h> @@ -216,3 +221,9 @@ int main(int argc, char **argv) { gpr_free(g_pollset); return 0; } + +#else /* GRPC_POSIX_SOCKET */ + +int main(int argc, char **argv) { return 1; } + +#endif /* GRPC_POSIX_SOCKET */ diff --git a/test/core/iomgr/tcp_client_uv_test.c b/test/core/iomgr/tcp_client_uv_test.c new file mode 100644 index 0000000000..f8938d0abb --- /dev/null +++ b/test/core/iomgr/tcp_client_uv_test.c @@ -0,0 +1,222 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with libuv +#ifdef GRPC_UV + +#include <uv.h> + +#include <string.h> + +#include "src/core/lib/iomgr/tcp_client.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/timer.h" +#include "test/core/util/test_config.h" + +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; +static int g_connections_complete = 0; +static grpc_endpoint *g_connecting = NULL; + +static gpr_timespec test_deadline(void) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); +} + +static void finish_connection() { + gpr_mu_lock(g_mu); + g_connections_complete++; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + +static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_ASSERT(g_connecting != NULL); + GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_endpoint_shutdown(exec_ctx, g_connecting); + grpc_endpoint_destroy(exec_ctx, g_connecting); + g_connecting = NULL; + finish_connection(); +} + +static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + GPR_ASSERT(g_connecting == NULL); + GPR_ASSERT(error != GRPC_ERROR_NONE); + finish_connection(); +} + +static void close_cb(uv_handle_t *handle) { gpr_free(handle); } + +static void connection_cb(uv_stream_t *server, int status) { + uv_tcp_t *client_handle = gpr_malloc(sizeof(uv_tcp_t)); + GPR_ASSERT(0 == status); + GPR_ASSERT(0 == uv_tcp_init(uv_default_loop(), client_handle)); + GPR_ASSERT(0 == uv_accept(server, (uv_stream_t *)client_handle)); + uv_close((uv_handle_t *)client_handle, close_cb); +} + +void test_succeeds(void) { + grpc_resolved_address resolved_addr; + struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; + uv_tcp_t *svr_handle = gpr_malloc(sizeof(uv_tcp_t)); + int connections_complete_before; + grpc_closure done; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + gpr_log(GPR_DEBUG, "test_succeeds"); + + memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(struct sockaddr_in); + addr->sin_family = AF_INET; + + /* create a dummy server */ + GPR_ASSERT(0 == uv_tcp_init(uv_default_loop(), svr_handle)); + GPR_ASSERT(0 == uv_tcp_bind(svr_handle, (struct sockaddr *)addr, 0)); + GPR_ASSERT(0 == uv_listen((uv_stream_t *)svr_handle, 1, connection_cb)); + + gpr_mu_lock(g_mu); + connections_complete_before = g_connections_complete; + gpr_mu_unlock(g_mu); + + /* connect to it */ + GPR_ASSERT(uv_tcp_getsockname(svr_handle, (struct sockaddr *)addr, + (int *)&resolved_addr.len) == 0); + grpc_closure_init(&done, must_succeed, NULL, grpc_schedule_on_exec_ctx); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL, + &resolved_addr, gpr_inf_future(GPR_CLOCK_REALTIME)); + + gpr_mu_lock(g_mu); + + while (g_connections_complete == connections_complete_before) { + grpc_pollset_worker *worker = NULL; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_flush(&exec_ctx); + gpr_mu_lock(g_mu); + } + + // This will get cleaned up when the pollset runs again or gets shutdown + uv_close((uv_handle_t *)svr_handle, close_cb); + + gpr_mu_unlock(g_mu); + + grpc_exec_ctx_finish(&exec_ctx); +} + +void test_fails(void) { + grpc_resolved_address resolved_addr; + struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; + int connections_complete_before; + grpc_closure done; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + gpr_log(GPR_DEBUG, "test_fails"); + + memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(struct sockaddr_in); + addr->sin_family = AF_INET; + + gpr_mu_lock(g_mu); + connections_complete_before = g_connections_complete; + gpr_mu_unlock(g_mu); + + /* connect to a broken address */ + grpc_closure_init(&done, must_fail, NULL, grpc_schedule_on_exec_ctx); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL, + &resolved_addr, gpr_inf_future(GPR_CLOCK_REALTIME)); + + gpr_mu_lock(g_mu); + + /* wait for the connection callback to finish */ + while (g_connections_complete == connections_complete_before) { + grpc_pollset_worker *worker = NULL; + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec polling_deadline = test_deadline(); + if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, + polling_deadline))); + } + gpr_mu_unlock(g_mu); + grpc_exec_ctx_flush(&exec_ctx); + gpr_mu_lock(g_mu); + } + + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, + grpc_error *error) { + grpc_pollset_destroy(p); +} + +int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_test_init(argc, argv); + grpc_init(); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_exec_ctx_finish(&exec_ctx); + test_succeeds(); + gpr_log(GPR_ERROR, "End of first test"); + test_fails(); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); + grpc_shutdown(); + gpr_free(g_pollset); + return 0; +} + +#else /* GRPC_UV */ + +int main(int argc, char **argv) { return 1; } + +#endif /* GRPC_UV */ diff --git a/test/core/iomgr/tcp_server_uv_test.c b/test/core/iomgr/tcp_server_uv_test.c new file mode 100644 index 0000000000..7b458c90f3 --- /dev/null +++ b/test/core/iomgr/tcp_server_uv_test.c @@ -0,0 +1,339 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with libuv +#ifdef GRPC_UV + +#include <uv.h> + +#include "src/core/lib/iomgr/tcp_server.h" + +#include <string.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) + +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; +static int g_nconnects = 0; + +typedef struct on_connect_result { + /* Owns a ref to server. */ + grpc_tcp_server *server; + unsigned port_index; + unsigned fd_index; +} on_connect_result; + +typedef struct server_weak_ref { + grpc_tcp_server *server; + + /* arg is this server_weak_ref. */ + grpc_closure server_shutdown; +} server_weak_ref; + +static on_connect_result g_result = {NULL, 0, 0}; + +static void on_connect_result_init(on_connect_result *result) { + result->server = NULL; + result->port_index = 0; + result->fd_index = 0; +} + +static void on_connect_result_set(on_connect_result *result, + const grpc_tcp_server_acceptor *acceptor) { + result->server = grpc_tcp_server_ref(acceptor->from_server); + result->port_index = acceptor->port_index; + result->fd_index = acceptor->fd_index; +} + +static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_weak_ref *weak_ref = arg; + weak_ref->server = NULL; +} + +static void server_weak_ref_init(server_weak_ref *weak_ref) { + weak_ref->server = NULL; + grpc_closure_init(&weak_ref->server_shutdown, server_weak_ref_shutdown, + weak_ref, grpc_schedule_on_exec_ctx); +} + +/* Make weak_ref->server_shutdown a shutdown_starting cb on server. + grpc_tcp_server promises that the server object will live until + weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server + should be held until server_weak_ref_set() returns to avoid a race where the + server is deleted before the shutdown_starting cb is added. */ +static void server_weak_ref_set(server_weak_ref *weak_ref, + grpc_tcp_server *server) { + grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown); + weak_ref->server = server; +} + +static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_pollset *pollset, + grpc_tcp_server_acceptor *acceptor) { + grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_destroy(exec_ctx, tcp); + + on_connect_result temp_result; + on_connect_result_set(&temp_result, acceptor); + gpr_free(acceptor); + + gpr_mu_lock(g_mu); + g_result = temp_result; + g_nconnects++; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + +static void test_no_op(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server *s; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_no_op_with_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp_server *s; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + LOG_TEST("test_no_op_with_start"); + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_no_op_with_port(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_address resolved_addr; + struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; + grpc_tcp_server *s; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + LOG_TEST("test_no_op_with_port"); + + memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(struct sockaddr_in); + addr->sin_family = AF_INET; + int port; + GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) == + GRPC_ERROR_NONE && + port > 0); + + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_no_op_with_port_and_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_address resolved_addr; + struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr; + grpc_tcp_server *s; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + LOG_TEST("test_no_op_with_port_and_start"); + int port; + + memset(&resolved_addr, 0, sizeof(resolved_addr)); + resolved_addr.len = sizeof(struct sockaddr_in); + addr->sin_family = AF_INET; + GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) == + GRPC_ERROR_NONE && + port > 0); + + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); + + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void connect_cb(uv_connect_t *req, int status) { + GPR_ASSERT(status == 0); + gpr_free(req); +} + +static void close_cb(uv_handle_t *handle) { gpr_free(handle); } + +static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, + socklen_t remote_len, on_connect_result *result) { + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); + uv_tcp_t *client_handle = gpr_malloc(sizeof(uv_tcp_t)); + uv_connect_t *req = gpr_malloc(sizeof(uv_connect_t)); + int nconnects_before; + + gpr_mu_lock(g_mu); + nconnects_before = g_nconnects; + on_connect_result_init(&g_result); + GPR_ASSERT(uv_tcp_init(uv_default_loop(), client_handle) == 0); + gpr_log(GPR_DEBUG, "start connect"); + GPR_ASSERT(uv_tcp_connect(req, client_handle, remote, connect_cb) == 0); + gpr_log(GPR_DEBUG, "wait"); + while (g_nconnects == nconnects_before && + gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { + grpc_pollset_worker *worker = NULL; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(exec_ctx); + gpr_mu_lock(g_mu); + } + gpr_log(GPR_DEBUG, "wait done"); + GPR_ASSERT(g_nconnects == nconnects_before + 1); + uv_close((uv_handle_t *)client_handle, close_cb); + *result = g_result; + + gpr_mu_unlock(g_mu); +} + +/* Tests a tcp server with multiple ports. */ +static void test_connect(unsigned n) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_address resolved_addr; + grpc_resolved_address resolved_addr1; + struct sockaddr_storage *addr = (struct sockaddr_storage *)resolved_addr.addr; + struct sockaddr_storage *addr1 = + (struct sockaddr_storage *)resolved_addr1.addr; + int svr_port; + int svr1_port; + grpc_tcp_server *s; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + unsigned i; + server_weak_ref weak_ref; + server_weak_ref_init(&weak_ref); + LOG_TEST("test_connect"); + gpr_log(GPR_INFO, "clients=%d", n); + memset(&resolved_addr, 0, sizeof(resolved_addr)); + memset(&resolved_addr1, 0, sizeof(resolved_addr1)); + resolved_addr.len = sizeof(struct sockaddr_storage); + resolved_addr1.len = sizeof(struct sockaddr_storage); + addr->ss_family = addr1->ss_family = AF_INET; + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)); + GPR_ASSERT(svr_port > 0); + GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6 *)addr) == 0); + /* Cannot use wildcard (port==0), because add_port() will try to reuse the + same port as a previous add_port(). */ + svr1_port = grpc_pick_unused_port_or_die(); + grpc_sockaddr_set_port(&resolved_addr1, svr1_port); + GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr1, &svr_port) == + GRPC_ERROR_NONE && + svr_port == svr1_port); + + grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL); + + GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6 *)addr1) == 0); + + for (i = 0; i < n; i++) { + on_connect_result result; + on_connect_result_init(&result); + tcp_connect(&exec_ctx, (struct sockaddr *)addr, + (socklen_t)resolved_addr.len, &result); + GPR_ASSERT(result.port_index == 0); + GPR_ASSERT(result.server == s); + if (weak_ref.server == NULL) { + server_weak_ref_set(&weak_ref, result.server); + } + grpc_tcp_server_unref(&exec_ctx, result.server); + + on_connect_result_init(&result); + tcp_connect(&exec_ctx, (struct sockaddr *)addr1, + (socklen_t)resolved_addr1.len, &result); + GPR_ASSERT(result.port_index == 1); + GPR_ASSERT(result.server == s); + grpc_tcp_server_unref(&exec_ctx, result.server); + } + + /* Weak ref to server valid until final unref. */ + GPR_ASSERT(weak_ref.server != NULL); + + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); + + /* Weak ref lost. */ + GPR_ASSERT(weak_ref.server == NULL); +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, + grpc_error *error) { + grpc_pollset_destroy(p); +} + +int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_test_init(argc, argv); + grpc_init(); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + + test_no_op(); + test_no_op_with_start(); + test_no_op_with_port(); + test_no_op_with_port_and_start(); + test_connect(1); + test_connect(10); + + grpc_closure_init(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); + grpc_shutdown(); + gpr_free(g_pollset); + return 0; +} + +#else /* GRPC_UV */ + +int main(int argc, char **argv) { return 1; } + +#endif /* GRPC_UV */ diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 8d7ac3fdaa..85ad5277cc 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -31,6 +31,11 @@ * */ +#include "src/core/lib/iomgr/port.h" + +// This test only works with the generic timer implementation +#ifdef GRPC_TIMER_USE_GENERIC + #include "src/core/lib/iomgr/timer.h" #include <string.h> @@ -169,3 +174,9 @@ int main(int argc, char **argv) { destruction_test(); return 0; } + +#else /* GRPC_TIMER_USE_GENERIC */ + +int main(int argc, char **argv) { return 1; } + +#endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index b8ea90921a..07f6a9869b 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -35,7 +35,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/iomgr/iomgr.h" @@ -194,223 +193,6 @@ static void test_pluck_after_shutdown(void) { grpc_completion_queue_destroy(cc); } -struct thread_state { - grpc_completion_queue *cc; - void *tag; -}; - -static void pluck_one(void *arg) { - struct thread_state *state = arg; - grpc_completion_queue_pluck(state->cc, state->tag, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); -} - -static void test_too_many_plucks(void) { - grpc_event ev; - grpc_completion_queue *cc; - void *tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; - gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)]; - struct thread_state thread_states[GPR_ARRAY_SIZE(tags)]; - gpr_thd_options thread_options = gpr_thd_options_default(); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - unsigned i, j; - - LOG_TEST("test_too_many_plucks"); - - cc = grpc_completion_queue_create(NULL); - gpr_thd_options_set_joinable(&thread_options); - - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - tags[i] = create_test_tag(); - for (j = 0; j < i; j++) { - GPR_ASSERT(tags[i] != tags[j]); - } - thread_states[i].cc = cc; - thread_states[i].tag = tags[i]; - gpr_thd_new(thread_ids + i, pluck_one, thread_states + i, &thread_options); - } - - /* wait until all other threads are plucking */ - gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000)); - - ev = grpc_completion_queue_pluck(cc, create_test_tag(), - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); - - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); - grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, - do_nothing_end_completion, NULL, &completions[i]); - } - - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - gpr_thd_join(thread_ids[i]); - } - - shutdown_and_destroy(cc); - grpc_exec_ctx_finish(&exec_ctx); -} - -#define TEST_THREAD_EVENTS 10000 - -typedef struct test_thread_options { - gpr_event on_started; - gpr_event *phase1; - gpr_event on_phase1_done; - gpr_event *phase2; - gpr_event on_finished; - size_t events_triggered; - int id; - grpc_completion_queue *cc; -} test_thread_options; - -gpr_timespec ten_seconds_time(void) { - return grpc_timeout_seconds_to_deadline(10); -} - -static void free_completion(grpc_exec_ctx *exec_ctx, void *arg, - grpc_cq_completion *completion) { - gpr_free(completion); -} - -static void producer_thread(void *arg) { - test_thread_options *opt = arg; - int i; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_log(GPR_INFO, "producer %d started", opt->id); - gpr_event_set(&opt->on_started, (void *)(intptr_t)1); - GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time())); - - gpr_log(GPR_INFO, "producer %d phase 1", opt->id); - for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1); - } - - gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); - gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1); - GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time())); - - gpr_log(GPR_INFO, "producer %d phase 2", opt->id); - for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(intptr_t)1, GRPC_ERROR_NONE, - free_completion, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); - opt->events_triggered++; - grpc_exec_ctx_finish(&exec_ctx); - } - - gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id); - gpr_event_set(&opt->on_finished, (void *)(intptr_t)1); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void consumer_thread(void *arg) { - test_thread_options *opt = arg; - grpc_event ev; - - gpr_log(GPR_INFO, "consumer %d started", opt->id); - gpr_event_set(&opt->on_started, (void *)(intptr_t)1); - GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time())); - - gpr_log(GPR_INFO, "consumer %d phase 1", opt->id); - - gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id); - gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1); - GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time())); - - gpr_log(GPR_INFO, "consumer %d phase 2", opt->id); - for (;;) { - ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL); - switch (ev.type) { - case GRPC_OP_COMPLETE: - GPR_ASSERT(ev.success); - opt->events_triggered++; - break; - case GRPC_QUEUE_SHUTDOWN: - gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id); - gpr_event_set(&opt->on_finished, (void *)(intptr_t)1); - return; - case GRPC_QUEUE_TIMEOUT: - gpr_log(GPR_ERROR, "Invalid timeout received"); - abort(); - } - } -} - -static void test_threading(size_t producers, size_t consumers) { - test_thread_options *options = - gpr_malloc((producers + consumers) * sizeof(test_thread_options)); - gpr_event phase1 = GPR_EVENT_INIT; - gpr_event phase2 = GPR_EVENT_INIT; - grpc_completion_queue *cc = grpc_completion_queue_create(NULL); - size_t i; - size_t total_consumed = 0; - static int optid = 101; - - gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers", - "test_threading", producers, consumers); - - /* start all threads: they will wait for phase1 */ - for (i = 0; i < producers + consumers; i++) { - gpr_thd_id id; - gpr_event_init(&options[i].on_started); - gpr_event_init(&options[i].on_phase1_done); - gpr_event_init(&options[i].on_finished); - options[i].phase1 = &phase1; - options[i].phase2 = &phase2; - options[i].events_triggered = 0; - options[i].cc = cc; - options[i].id = optid++; - GPR_ASSERT(gpr_thd_new(&id, - i < producers ? producer_thread : consumer_thread, - options + i, NULL)); - gpr_event_wait(&options[i].on_started, ten_seconds_time()); - } - - /* start phase1: producers will pre-declare all operations they will - complete */ - gpr_log(GPR_INFO, "start phase 1"); - gpr_event_set(&phase1, (void *)(intptr_t)1); - - gpr_log(GPR_INFO, "wait phase 1"); - for (i = 0; i < producers + consumers; i++) { - GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time())); - } - gpr_log(GPR_INFO, "done phase 1"); - - /* start phase2: operations will complete, and consumers will consume them */ - gpr_log(GPR_INFO, "start phase 2"); - gpr_event_set(&phase2, (void *)(intptr_t)1); - - /* in parallel, we shutdown the completion channel - all events should still - be consumed */ - grpc_completion_queue_shutdown(cc); - - /* join all threads */ - gpr_log(GPR_INFO, "wait phase 2"); - for (i = 0; i < producers + consumers; i++) { - GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time())); - } - gpr_log(GPR_INFO, "done phase 2"); - - /* destroy the completion channel */ - grpc_completion_queue_destroy(cc); - - /* verify that everything was produced and consumed */ - for (i = 0; i < producers + consumers; i++) { - if (i < producers) { - GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS); - } else { - total_consumed += options[i].events_triggered; - } - } - GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS); - - gpr_free(options); -} - int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); @@ -422,11 +204,6 @@ int main(int argc, char **argv) { test_cq_end_op(); test_pluck(); test_pluck_after_shutdown(); - test_too_many_plucks(); - test_threading(1, 1); - test_threading(1, 10); - test_threading(10, 1); - test_threading(10, 10); grpc_shutdown(); return 0; } diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c new file mode 100644 index 0000000000..2d55ead843 --- /dev/null +++ b/test/core/surface/completion_queue_threading_test.c @@ -0,0 +1,290 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/surface/completion_queue.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/iomgr/iomgr.h" +#include "test/core/util/test_config.h" + +#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x) + +static void *create_test_tag(void) { + static intptr_t i = 0; + return (void *)(++i); +} + +/* helper for tests to shutdown correctly and tersely */ +static void shutdown_and_destroy(grpc_completion_queue *cc) { + grpc_event ev; + grpc_completion_queue_shutdown(cc); + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); + grpc_completion_queue_destroy(cc); +} + +static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *c) {} + +struct thread_state { + grpc_completion_queue *cc; + void *tag; +}; + +static void pluck_one(void *arg) { + struct thread_state *state = arg; + grpc_completion_queue_pluck(state->cc, state->tag, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); +} + +static void test_too_many_plucks(void) { + grpc_event ev; + grpc_completion_queue *cc; + void *tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; + grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; + gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)]; + struct thread_state thread_states[GPR_ARRAY_SIZE(tags)]; + gpr_thd_options thread_options = gpr_thd_options_default(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + unsigned i, j; + + LOG_TEST("test_too_many_plucks"); + + cc = grpc_completion_queue_create(NULL); + gpr_thd_options_set_joinable(&thread_options); + + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + tags[i] = create_test_tag(); + for (j = 0; j < i; j++) { + GPR_ASSERT(tags[i] != tags[j]); + } + thread_states[i].cc = cc; + thread_states[i].tag = tags[i]; + gpr_thd_new(thread_ids + i, pluck_one, thread_states + i, &thread_options); + } + + /* wait until all other threads are plucking */ + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000)); + + ev = grpc_completion_queue_pluck(cc, create_test_tag(), + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + grpc_cq_begin_op(cc, tags[i]); + grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completions[i]); + } + + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + gpr_thd_join(thread_ids[i]); + } + + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); +} + +#define TEST_THREAD_EVENTS 10000 + +typedef struct test_thread_options { + gpr_event on_started; + gpr_event *phase1; + gpr_event on_phase1_done; + gpr_event *phase2; + gpr_event on_finished; + size_t events_triggered; + int id; + grpc_completion_queue *cc; +} test_thread_options; + +gpr_timespec ten_seconds_time(void) { + return grpc_timeout_seconds_to_deadline(10); +} + +static void free_completion(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *completion) { + gpr_free(completion); +} + +static void producer_thread(void *arg) { + test_thread_options *opt = arg; + int i; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + gpr_log(GPR_INFO, "producer %d started", opt->id); + gpr_event_set(&opt->on_started, (void *)(intptr_t)1); + GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time())); + + gpr_log(GPR_INFO, "producer %d phase 1", opt->id); + for (i = 0; i < TEST_THREAD_EVENTS; i++) { + grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1); + } + + gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); + gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1); + GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time())); + + gpr_log(GPR_INFO, "producer %d phase 2", opt->id); + for (i = 0; i < TEST_THREAD_EVENTS; i++) { + grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(intptr_t)1, GRPC_ERROR_NONE, + free_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + opt->events_triggered++; + grpc_exec_ctx_finish(&exec_ctx); + } + + gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id); + gpr_event_set(&opt->on_finished, (void *)(intptr_t)1); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void consumer_thread(void *arg) { + test_thread_options *opt = arg; + grpc_event ev; + + gpr_log(GPR_INFO, "consumer %d started", opt->id); + gpr_event_set(&opt->on_started, (void *)(intptr_t)1); + GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time())); + + gpr_log(GPR_INFO, "consumer %d phase 1", opt->id); + + gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id); + gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1); + GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time())); + + gpr_log(GPR_INFO, "consumer %d phase 2", opt->id); + for (;;) { + ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL); + switch (ev.type) { + case GRPC_OP_COMPLETE: + GPR_ASSERT(ev.success); + opt->events_triggered++; + break; + case GRPC_QUEUE_SHUTDOWN: + gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id); + gpr_event_set(&opt->on_finished, (void *)(intptr_t)1); + return; + case GRPC_QUEUE_TIMEOUT: + gpr_log(GPR_ERROR, "Invalid timeout received"); + abort(); + } + } +} + +static void test_threading(size_t producers, size_t consumers) { + test_thread_options *options = + gpr_malloc((producers + consumers) * sizeof(test_thread_options)); + gpr_event phase1 = GPR_EVENT_INIT; + gpr_event phase2 = GPR_EVENT_INIT; + grpc_completion_queue *cc = grpc_completion_queue_create(NULL); + size_t i; + size_t total_consumed = 0; + static int optid = 101; + + gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers", + "test_threading", producers, consumers); + + /* start all threads: they will wait for phase1 */ + for (i = 0; i < producers + consumers; i++) { + gpr_thd_id id; + gpr_event_init(&options[i].on_started); + gpr_event_init(&options[i].on_phase1_done); + gpr_event_init(&options[i].on_finished); + options[i].phase1 = &phase1; + options[i].phase2 = &phase2; + options[i].events_triggered = 0; + options[i].cc = cc; + options[i].id = optid++; + GPR_ASSERT(gpr_thd_new(&id, + i < producers ? producer_thread : consumer_thread, + options + i, NULL)); + gpr_event_wait(&options[i].on_started, ten_seconds_time()); + } + + /* start phase1: producers will pre-declare all operations they will + complete */ + gpr_log(GPR_INFO, "start phase 1"); + gpr_event_set(&phase1, (void *)(intptr_t)1); + + gpr_log(GPR_INFO, "wait phase 1"); + for (i = 0; i < producers + consumers; i++) { + GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time())); + } + gpr_log(GPR_INFO, "done phase 1"); + + /* start phase2: operations will complete, and consumers will consume them */ + gpr_log(GPR_INFO, "start phase 2"); + gpr_event_set(&phase2, (void *)(intptr_t)1); + + /* in parallel, we shutdown the completion channel - all events should still + be consumed */ + grpc_completion_queue_shutdown(cc); + + /* join all threads */ + gpr_log(GPR_INFO, "wait phase 2"); + for (i = 0; i < producers + consumers; i++) { + GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time())); + } + gpr_log(GPR_INFO, "done phase 2"); + + /* destroy the completion channel */ + grpc_completion_queue_destroy(cc); + + /* verify that everything was produced and consumed */ + for (i = 0; i < producers + consumers; i++) { + if (i < producers) { + GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS); + } else { + total_consumed += options[i].events_triggered; + } + } + GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS); + + gpr_free(options); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + test_too_many_plucks(); + test_threading(1, 1); + test_threading(1, 10); + test_threading(10, 1); + test_threading(10, 10); + grpc_shutdown(); + return 0; +} diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index d531ec6031..b8fed7e14b 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -31,6 +31,12 @@ * */ +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include "test/core/util/mock_endpoint.h" #include <inttypes.h> diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 1e82c737c6..5f27f9ae73 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -31,6 +31,12 @@ * */ +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include "test/core/util/passthru_endpoint.h" #include <inttypes.h> diff --git a/test/core/util/test_config.c b/test/core/util/test_config.c index 94aab27253..0180d6f08d 100644 --- a/test/core/util/test_config.c +++ b/test/core/util/test_config.c @@ -215,6 +215,16 @@ static void install_crash_handler() { #include <stdio.h> #include <string.h> +#define SIGNAL_NAMES_LENGTH 32 + +static const char *const signal_names[] = { + NULL, "SIGHUP", "SIGINT", "SIGQUIT", "SIGILL", "SIGTRAP", + "SIGABRT", "SIGBUS", "SIGFPE", "SIGKILL", "SIGUSR1", "SIGSEGV", + "SIGUSR2", "SIGPIPE", "SIGALRM", "SIGTERM", "SIGSTKFLT", "SIGCHLD", + "SIGCONT", "SIGSTOP", "SIGTSTP", "SIGTTIN", "SIGTTOU", "SIGURG", + "SIGXCPU", "SIGXFSZ", "SIGVTALRM", "SIGPROF", "SIGWINCH", "SIGIO", + "SIGPWR", "SIGSYS"}; + static char g_alt_stack[GPR_MAX(MINSIGSTKSZ, 65536)]; #define MAX_FRAMES 32 @@ -240,7 +250,11 @@ static void crash_handler(int signum, siginfo_t *info, void *data) { int addrlen; output_string("\n\n\n*******************************\nCaught signal "); - output_num(signum); + if (signum > 0 && signum < SIGNAL_NAMES_LENGTH) { + output_string(signal_names[signum]); + } else { + output_num(signum); + } output_string("\n"); addrlen = backtrace(addrlist, GPR_ARRAY_SIZE(addrlist)); |