aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2017-02-08 11:56:52 -0800
committerGravatar murgatroid99 <mlumish@google.com>2017-02-17 12:10:26 -0800
commit1191b7202d86f01857f74d0709792fb74eac5a37 (patch)
tree6161f0d0a94e5308c4d38ed87589e92d3c905f8d /test
parentfa301e3674a1cc786eb4dd4253a0e677f2eb68e3 (diff)
Improve Node and libuv testing and test coverage
Allow Node tests to run with or without UV, change default version to 7, add some portability tests. Also make some more core tests work with libuv
Diffstat (limited to 'test')
-rw-r--r--test/core/end2end/goaway_server_test.c6
-rw-r--r--test/core/handshake/client_ssl.c11
-rw-r--r--test/core/iomgr/resolve_address_test.c53
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c11
-rw-r--r--test/core/iomgr/tcp_client_uv_test.c224
-rw-r--r--test/core/iomgr/tcp_server_uv_test.c341
-rw-r--r--test/core/iomgr/timer_list_test.c11
-rw-r--r--test/core/surface/completion_queue_test.c223
-rw-r--r--test/core/surface/completion_queue_threading_test.c290
-rw-r--r--test/core/util/mock_endpoint.c6
-rw-r--r--test/core/util/passthru_endpoint.c6
-rw-r--r--test/core/util/test_config.c19
12 files changed, 961 insertions, 240 deletions
diff --git a/test/core/end2end/goaway_server_test.c b/test/core/end2end/goaway_server_test.c
index cd68b390bb..8e90464d02 100644
--- a/test/core/end2end/goaway_server_test.c
+++ b/test/core/end2end/goaway_server_test.c
@@ -31,6 +31,12 @@
*
*/
+/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+ using that endpoint. Because of various transitive includes in uv.h,
+ including windows.h on Windows, uv.h must be included before other system
+ headers. Therefore, sockaddr.h must always be included first */
+#include "src/core/lib/iomgr/sockaddr.h"
+
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
diff --git a/test/core/handshake/client_ssl.c b/test/core/handshake/client_ssl.c
index 613251b835..1d853af555 100644
--- a/test/core/handshake/client_ssl.c
+++ b/test/core/handshake/client_ssl.c
@@ -31,6 +31,11 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+// This test won't work except with posix sockets enabled
+#ifdef GRPC_POSIX_SOCKET
+
#include <arpa/inet.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
@@ -311,3 +316,9 @@ int main(int argc, char *argv[]) {
GPR_ASSERT(!client_ssl_test("foo"));
return 0;
}
+
+#else /* GRPC_POSIX_SOCKET */
+
+int main(int argc, char **argv) { return 1; }
+
+#endif /* GRPC_POSIX_SOCKET */
diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c
index 84f4ed4c90..71e9fe5ba7 100644
--- a/test/core/iomgr/resolve_address_test.c
+++ b/test/core/iomgr/resolve_address_test.c
@@ -35,7 +35,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
-#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -63,6 +62,7 @@ void args_init(grpc_exec_ctx *exec_ctx, args_struct *args) {
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset);
args->addrs = NULL;
+ gpr_atm_rel_store(&args->done_atm, 0);
}
void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) {
@@ -85,8 +85,7 @@ static gpr_timespec n_sec_deadline(int seconds) {
gpr_time_from_seconds(seconds, GPR_TIMESPAN));
}
-static void actually_poll(void *argsp) {
- args_struct *args = argsp;
+static void poll_pollset_until_request_done(args_struct *args) {
gpr_timespec deadline = n_sec_deadline(10);
while (true) {
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
@@ -111,12 +110,6 @@ static void actually_poll(void *argsp) {
gpr_event_set(&args->ev, (void *)1);
}
-static void poll_pollset_until_request_done(args_struct *args) {
- gpr_atm_rel_store(&args->done_atm, 0);
- gpr_thd_id id;
- gpr_thd_new(&id, actually_poll, args, NULL);
-}
-
static void must_succeed(grpc_exec_ctx *exec_ctx, void *argsp,
grpc_error *err) {
args_struct *args = argsp;
@@ -124,23 +117,30 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *argsp,
GPR_ASSERT(args->addrs != NULL);
GPR_ASSERT(args->addrs->naddrs > 0);
gpr_atm_rel_store(&args->done_atm, 1);
+ gpr_mu_lock(args->mu);
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));
+ gpr_mu_unlock(args->mu);
}
static void must_fail(grpc_exec_ctx *exec_ctx, void *argsp, grpc_error *err) {
args_struct *args = argsp;
GPR_ASSERT(err != GRPC_ERROR_NONE);
gpr_atm_rel_store(&args->done_atm, 1);
+ gpr_mu_lock(args->mu);
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));
+ gpr_mu_unlock(args->mu);
}
static void test_localhost(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, "localhost:1", NULL, args.pollset_set,
grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -149,24 +149,40 @@ static void test_default_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, "localhost", "1", args.pollset_set,
grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void test_missing_default_port(void) {
+static void test_non_numeric_default_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
+ grpc_resolve_address(
+ &exec_ctx, "localhost", "https", args.pollset_set,
+ grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
+ &args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_missing_default_port(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ args_struct args;
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
&exec_ctx, "localhost", NULL, args.pollset_set,
grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -175,11 +191,12 @@ static void test_ipv6_with_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, "[2001:db8::1]:1", NULL, args.pollset_set,
grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -193,11 +210,12 @@ static void test_ipv6_without_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, kCases[i], "80", args.pollset_set,
grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -212,11 +230,12 @@ static void test_invalid_ip_addresses(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, kCases[i], NULL, args.pollset_set,
grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -231,11 +250,12 @@ static void test_unparseable_hostports(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
- poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, kCases[i], "1", args.pollset_set,
grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
+ grpc_exec_ctx_flush(&exec_ctx);
+ poll_pollset_until_request_done(&args);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -247,6 +267,7 @@ int main(int argc, char **argv) {
grpc_iomgr_init();
test_localhost();
test_default_port();
+ test_non_numeric_default_port();
test_missing_default_port();
test_ipv6_with_port();
test_ipv6_without_port();
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 0ea7a000eb..e813cdbf2b 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -31,6 +31,11 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+// This test won't work except with posix sockets enabled
+#ifdef GRPC_POSIX_SOCKET
+
#include "src/core/lib/iomgr/tcp_client.h"
#include <errno.h>
@@ -215,3 +220,9 @@ int main(int argc, char **argv) {
gpr_free(g_pollset);
return 0;
}
+
+#else /* GRPC_POSIX_SOCKET */
+
+int main(int argc, char **argv) { return 1; }
+
+#endif /* GRPC_POSIX_SOCKET */
diff --git a/test/core/iomgr/tcp_client_uv_test.c b/test/core/iomgr/tcp_client_uv_test.c
new file mode 100644
index 0000000000..fc2360b436
--- /dev/null
+++ b/test/core/iomgr/tcp_client_uv_test.c
@@ -0,0 +1,224 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+// This test won't work except with libuv
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include <string.h>
+
+#include "src/core/lib/iomgr/tcp_client.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "test/core/util/test_config.h"
+
+static gpr_mu *g_mu;
+static grpc_pollset *g_pollset;
+static int g_connections_complete = 0;
+static grpc_endpoint *g_connecting = NULL;
+
+static gpr_timespec test_deadline(void) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
+}
+
+static void finish_connection() {
+ gpr_mu_lock(g_mu);
+ g_connections_complete++;
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
+ gpr_mu_unlock(g_mu);
+}
+
+static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_ASSERT(g_connecting != NULL);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ grpc_endpoint_shutdown(exec_ctx, g_connecting);
+ grpc_endpoint_destroy(exec_ctx, g_connecting);
+ g_connecting = NULL;
+ finish_connection();
+}
+
+static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ GPR_ASSERT(g_connecting == NULL);
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ finish_connection();
+}
+
+static void close_cb(uv_handle_t *handle) {
+ gpr_free(handle);
+}
+
+static void connection_cb(uv_stream_t *server, int status) {
+ uv_tcp_t *client_handle = gpr_malloc(sizeof(uv_tcp_t));
+ GPR_ASSERT(0 == status);
+ GPR_ASSERT(0 == uv_tcp_init(uv_default_loop(), client_handle));
+ GPR_ASSERT(0 == uv_accept(server, (uv_stream_t *)client_handle));
+ uv_close((uv_handle_t *)client_handle, close_cb);
+}
+
+void test_succeeds(void) {
+ grpc_resolved_address resolved_addr;
+ struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr;
+ uv_tcp_t *svr_handle = gpr_malloc(sizeof(uv_tcp_t));
+ int connections_complete_before;
+ grpc_closure done;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ gpr_log(GPR_DEBUG, "test_succeeds");
+
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ resolved_addr.len = sizeof(struct sockaddr_in);
+ addr->sin_family = AF_INET;
+
+ /* create a dummy server */
+ GPR_ASSERT(0 == uv_tcp_init(uv_default_loop(), svr_handle));
+ GPR_ASSERT(0 == uv_tcp_bind(svr_handle, (struct sockaddr *)addr, 0));
+ GPR_ASSERT(0 == uv_listen((uv_stream_t *)svr_handle, 1, connection_cb));
+
+ gpr_mu_lock(g_mu);
+ connections_complete_before = g_connections_complete;
+ gpr_mu_unlock(g_mu);
+
+ /* connect to it */
+ GPR_ASSERT(uv_tcp_getsockname(svr_handle, (struct sockaddr *)addr,
+ (int *)&resolved_addr.len) == 0);
+ grpc_closure_init(&done, must_succeed, NULL, grpc_schedule_on_exec_ctx);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL,
+ &resolved_addr, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ gpr_mu_lock(g_mu);
+
+ while (g_connections_complete == connections_complete_before) {
+ grpc_pollset_worker *worker = NULL;
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))));
+ gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_flush(&exec_ctx);
+ gpr_mu_lock(g_mu);
+ }
+
+ // This will get cleaned up when the pollset runs again or gets shutdown
+ uv_close((uv_handle_t*)svr_handle, close_cb);
+
+ gpr_mu_unlock(g_mu);
+
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+void test_fails(void) {
+ grpc_resolved_address resolved_addr;
+ struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr;
+ int connections_complete_before;
+ grpc_closure done;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ gpr_log(GPR_DEBUG, "test_fails");
+
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ resolved_addr.len = sizeof(struct sockaddr_in);
+ addr->sin_family = AF_INET;
+
+ gpr_mu_lock(g_mu);
+ connections_complete_before = g_connections_complete;
+ gpr_mu_unlock(g_mu);
+
+ /* connect to a broken address */
+ grpc_closure_init(&done, must_fail, NULL, grpc_schedule_on_exec_ctx);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL,
+ &resolved_addr, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ gpr_mu_lock(g_mu);
+
+ /* wait for the connection callback to finish */
+ while (g_connections_complete == connections_complete_before) {
+ grpc_pollset_worker *worker = NULL;
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec polling_deadline = test_deadline();
+ if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, now,
+ polling_deadline)));
+ }
+ gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_flush(&exec_ctx);
+ gpr_mu_lock(g_mu);
+ }
+
+ gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
+ grpc_pollset_destroy(p);
+}
+
+int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_test_init(argc, argv);
+ grpc_init();
+ g_pollset = gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(g_pollset, &g_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
+ test_succeeds();
+ gpr_log(GPR_ERROR, "End of first test");
+ test_fails();
+ grpc_closure_init(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
+ grpc_shutdown();
+ gpr_free(g_pollset);
+ return 0;
+}
+
+#else /* GRPC_UV */
+
+int main(int argc, char **argv) { return 1; }
+
+#endif /* GRPC_UV */
diff --git a/test/core/iomgr/tcp_server_uv_test.c b/test/core/iomgr/tcp_server_uv_test.c
new file mode 100644
index 0000000000..07d09af127
--- /dev/null
+++ b/test/core/iomgr/tcp_server_uv_test.c
@@ -0,0 +1,341 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+// This test won't work except with libuv
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+#include "src/core/lib/iomgr/tcp_server.h"
+
+#include <string.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
+
+static gpr_mu *g_mu;
+static grpc_pollset *g_pollset;
+static int g_nconnects = 0;
+
+typedef struct on_connect_result {
+ /* Owns a ref to server. */
+ grpc_tcp_server *server;
+ unsigned port_index;
+ unsigned fd_index;
+} on_connect_result;
+
+typedef struct server_weak_ref {
+ grpc_tcp_server *server;
+
+ /* arg is this server_weak_ref. */
+ grpc_closure server_shutdown;
+} server_weak_ref;
+
+static on_connect_result g_result = {NULL, 0, 0};
+
+static void on_connect_result_init(on_connect_result *result) {
+ result->server = NULL;
+ result->port_index = 0;
+ result->fd_index = 0;
+}
+
+static void on_connect_result_set(on_connect_result *result,
+ const grpc_tcp_server_acceptor *acceptor) {
+ result->server = grpc_tcp_server_ref(acceptor->from_server);
+ result->port_index = acceptor->port_index;
+ result->fd_index = acceptor->fd_index;
+}
+
+static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ server_weak_ref *weak_ref = arg;
+ weak_ref->server = NULL;
+}
+
+static void server_weak_ref_init(server_weak_ref *weak_ref) {
+ weak_ref->server = NULL;
+ grpc_closure_init(&weak_ref->server_shutdown, server_weak_ref_shutdown,
+ weak_ref, grpc_schedule_on_exec_ctx);
+}
+
+/* Make weak_ref->server_shutdown a shutdown_starting cb on server.
+ grpc_tcp_server promises that the server object will live until
+ weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
+ should be held until server_weak_ref_set() returns to avoid a race where the
+ server is deleted before the shutdown_starting cb is added. */
+static void server_weak_ref_set(server_weak_ref *weak_ref,
+ grpc_tcp_server *server) {
+ grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
+ weak_ref->server = server;
+}
+
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_pollset *pollset,
+ grpc_tcp_server_acceptor *acceptor) {
+ grpc_endpoint_shutdown(exec_ctx, tcp);
+ grpc_endpoint_destroy(exec_ctx, tcp);
+
+ on_connect_result temp_result;
+ on_connect_result_set(&temp_result, acceptor);
+ gpr_free(acceptor);
+
+ gpr_mu_lock(g_mu);
+ g_result = temp_result;
+ g_nconnects++;
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
+ gpr_mu_unlock(g_mu);
+}
+
+static void test_no_op(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_tcp_server *s;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_no_op_with_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_tcp_server *s;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ LOG_TEST("test_no_op_with_start");
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_no_op_with_port(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolved_address resolved_addr;
+ struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr;
+ grpc_tcp_server *s;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ LOG_TEST("test_no_op_with_port");
+
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ resolved_addr.len = sizeof(struct sockaddr_in);
+ addr->sin_family = AF_INET;
+ int port;
+ GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
+ GRPC_ERROR_NONE &&
+ port > 0);
+
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_no_op_with_port_and_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolved_address resolved_addr;
+ struct sockaddr_in *addr = (struct sockaddr_in *)resolved_addr.addr;
+ grpc_tcp_server *s;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ LOG_TEST("test_no_op_with_port_and_start");
+ int port;
+
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ resolved_addr.len = sizeof(struct sockaddr_in);
+ addr->sin_family = AF_INET;
+ GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
+ GRPC_ERROR_NONE &&
+ port > 0);
+
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
+
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void connect_cb(uv_connect_t *req, int status) {
+ GPR_ASSERT(status == 0);
+ gpr_free(req);
+}
+
+static void close_cb(uv_handle_t *handle) {
+ gpr_free(handle);
+}
+
+static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote,
+ socklen_t remote_len, on_connect_result *result) {
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
+ uv_tcp_t *client_handle = gpr_malloc(sizeof(uv_tcp_t));
+ uv_connect_t *req = gpr_malloc(sizeof(uv_connect_t));
+ int nconnects_before;
+
+ gpr_mu_lock(g_mu);
+ nconnects_before = g_nconnects;
+ on_connect_result_init(&g_result);
+ GPR_ASSERT(uv_tcp_init(uv_default_loop(), client_handle) == 0);
+ gpr_log(GPR_DEBUG, "start connect");
+ GPR_ASSERT(uv_tcp_connect(req, client_handle, remote, connect_cb) == 0);
+ gpr_log(GPR_DEBUG, "wait");
+ while (g_nconnects == nconnects_before &&
+ gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
+ grpc_pollset_worker *worker = NULL;
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
+ gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_finish(exec_ctx);
+ gpr_mu_lock(g_mu);
+ }
+ gpr_log(GPR_DEBUG, "wait done");
+ GPR_ASSERT(g_nconnects == nconnects_before + 1);
+ uv_close((uv_handle_t *)client_handle, close_cb);
+ *result = g_result;
+
+ gpr_mu_unlock(g_mu);
+}
+
+/* Tests a tcp server with multiple ports. */
+static void test_connect(unsigned n) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolved_address resolved_addr;
+ grpc_resolved_address resolved_addr1;
+ struct sockaddr_storage *addr = (struct sockaddr_storage *)resolved_addr.addr;
+ struct sockaddr_storage *addr1 =
+ (struct sockaddr_storage *)resolved_addr1.addr;
+ int svr_port;
+ int svr1_port;
+ grpc_tcp_server *s;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ unsigned i;
+ server_weak_ref weak_ref;
+ server_weak_ref_init(&weak_ref);
+ LOG_TEST("test_connect");
+ gpr_log(GPR_INFO, "clients=%d", n);
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ memset(&resolved_addr1, 0, sizeof(resolved_addr1));
+ resolved_addr.len = sizeof(struct sockaddr_storage);
+ resolved_addr1.len = sizeof(struct sockaddr_storage);
+ addr->ss_family = addr1->ss_family = AF_INET;
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_add_port(s, &resolved_addr, &svr_port));
+ GPR_ASSERT(svr_port > 0);
+ GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6 *)addr) == 0);
+ /* Cannot use wildcard (port==0), because add_port() will try to reuse the
+ same port as a previous add_port(). */
+ svr1_port = grpc_pick_unused_port_or_die();
+ grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
+ GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr1, &svr_port) ==
+ GRPC_ERROR_NONE &&
+ svr_port == svr1_port);
+
+ grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL);
+
+ GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6 *)addr1) == 0);
+
+ for (i = 0; i < n; i++) {
+ on_connect_result result;
+ on_connect_result_init(&result);
+ tcp_connect(&exec_ctx, (struct sockaddr *)addr,
+ (socklen_t)resolved_addr.len, &result);
+ GPR_ASSERT(result.port_index == 0);
+ GPR_ASSERT(result.server == s);
+ if (weak_ref.server == NULL) {
+ server_weak_ref_set(&weak_ref, result.server);
+ }
+ grpc_tcp_server_unref(&exec_ctx, result.server);
+
+ on_connect_result_init(&result);
+ tcp_connect(&exec_ctx, (struct sockaddr *)addr1,
+ (socklen_t)resolved_addr1.len, &result);
+ GPR_ASSERT(result.port_index == 1);
+ GPR_ASSERT(result.server == s);
+ grpc_tcp_server_unref(&exec_ctx, result.server);
+ }
+
+ /* Weak ref to server valid until final unref. */
+ GPR_ASSERT(weak_ref.server != NULL);
+
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ /* Weak ref lost. */
+ GPR_ASSERT(weak_ref.server == NULL);
+}
+
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
+ grpc_pollset_destroy(p);
+}
+
+int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_test_init(argc, argv);
+ grpc_init();
+ g_pollset = gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(g_pollset, &g_mu);
+
+ test_no_op();
+ test_no_op_with_start();
+ test_no_op_with_port();
+ test_no_op_with_port_and_start();
+ test_connect(1);
+ test_connect(10);
+
+ grpc_closure_init(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
+ grpc_shutdown();
+ gpr_free(g_pollset);
+ return 0;
+}
+
+#else /* GRPC_UV */
+
+int main(int argc, char **argv) { return 1; }
+
+#endif /* GRPC_UV */
diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c
index 8d7ac3fdaa..85ad5277cc 100644
--- a/test/core/iomgr/timer_list_test.c
+++ b/test/core/iomgr/timer_list_test.c
@@ -31,6 +31,11 @@
*
*/
+#include "src/core/lib/iomgr/port.h"
+
+// This test only works with the generic timer implementation
+#ifdef GRPC_TIMER_USE_GENERIC
+
#include "src/core/lib/iomgr/timer.h"
#include <string.h>
@@ -169,3 +174,9 @@ int main(int argc, char **argv) {
destruction_test();
return 0;
}
+
+#else /* GRPC_TIMER_USE_GENERIC */
+
+int main(int argc, char **argv) { return 1; }
+
+#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index 1486d2508f..07f6a9869b 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -35,7 +35,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr.h"
@@ -194,223 +193,6 @@ static void test_pluck_after_shutdown(void) {
grpc_completion_queue_destroy(cc);
}
-struct thread_state {
- grpc_completion_queue *cc;
- void *tag;
-};
-
-static void pluck_one(void *arg) {
- struct thread_state *state = arg;
- grpc_completion_queue_pluck(state->cc, state->tag,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
-}
-
-static void test_too_many_plucks(void) {
- grpc_event ev;
- grpc_completion_queue *cc;
- void *tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
- grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
- gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)];
- struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
- gpr_thd_options thread_options = gpr_thd_options_default();
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- unsigned i, j;
-
- LOG_TEST("test_too_many_plucks");
-
- cc = grpc_completion_queue_create(NULL);
- gpr_thd_options_set_joinable(&thread_options);
-
- for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
- tags[i] = create_test_tag();
- for (j = 0; j < i; j++) {
- GPR_ASSERT(tags[i] != tags[j]);
- }
- thread_states[i].cc = cc;
- thread_states[i].tag = tags[i];
- gpr_thd_new(thread_ids + i, pluck_one, thread_states + i, &thread_options);
- }
-
- /* wait until all other threads are plucking */
- gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(1000));
-
- ev = grpc_completion_queue_pluck(cc, create_test_tag(),
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
- GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
-
- for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
- grpc_cq_begin_op(cc, tags[i]);
- grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE,
- do_nothing_end_completion, NULL, &completions[i]);
- }
-
- for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
- gpr_thd_join(thread_ids[i]);
- }
-
- shutdown_and_destroy(cc);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-#define TEST_THREAD_EVENTS 10000
-
-typedef struct test_thread_options {
- gpr_event on_started;
- gpr_event *phase1;
- gpr_event on_phase1_done;
- gpr_event *phase2;
- gpr_event on_finished;
- size_t events_triggered;
- int id;
- grpc_completion_queue *cc;
-} test_thread_options;
-
-gpr_timespec ten_seconds_time(void) {
- return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
-}
-
-static void free_completion(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_cq_completion *completion) {
- gpr_free(completion);
-}
-
-static void producer_thread(void *arg) {
- test_thread_options *opt = arg;
- int i;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
- gpr_log(GPR_INFO, "producer %d started", opt->id);
- gpr_event_set(&opt->on_started, (void *)(intptr_t)1);
- GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
-
- gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
- for (i = 0; i < TEST_THREAD_EVENTS; i++) {
- grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1);
- }
-
- gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
- gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1);
- GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
-
- gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
- for (i = 0; i < TEST_THREAD_EVENTS; i++) {
- grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(intptr_t)1, GRPC_ERROR_NONE,
- free_completion, NULL,
- gpr_malloc(sizeof(grpc_cq_completion)));
- opt->events_triggered++;
- grpc_exec_ctx_finish(&exec_ctx);
- }
-
- gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
- gpr_event_set(&opt->on_finished, (void *)(intptr_t)1);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void consumer_thread(void *arg) {
- test_thread_options *opt = arg;
- grpc_event ev;
-
- gpr_log(GPR_INFO, "consumer %d started", opt->id);
- gpr_event_set(&opt->on_started, (void *)(intptr_t)1);
- GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
-
- gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
-
- gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
- gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1);
- GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
-
- gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
- for (;;) {
- ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
- switch (ev.type) {
- case GRPC_OP_COMPLETE:
- GPR_ASSERT(ev.success);
- opt->events_triggered++;
- break;
- case GRPC_QUEUE_SHUTDOWN:
- gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
- gpr_event_set(&opt->on_finished, (void *)(intptr_t)1);
- return;
- case GRPC_QUEUE_TIMEOUT:
- gpr_log(GPR_ERROR, "Invalid timeout received");
- abort();
- }
- }
-}
-
-static void test_threading(size_t producers, size_t consumers) {
- test_thread_options *options =
- gpr_malloc((producers + consumers) * sizeof(test_thread_options));
- gpr_event phase1 = GPR_EVENT_INIT;
- gpr_event phase2 = GPR_EVENT_INIT;
- grpc_completion_queue *cc = grpc_completion_queue_create(NULL);
- size_t i;
- size_t total_consumed = 0;
- static int optid = 101;
-
- gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
- "test_threading", producers, consumers);
-
- /* start all threads: they will wait for phase1 */
- for (i = 0; i < producers + consumers; i++) {
- gpr_thd_id id;
- gpr_event_init(&options[i].on_started);
- gpr_event_init(&options[i].on_phase1_done);
- gpr_event_init(&options[i].on_finished);
- options[i].phase1 = &phase1;
- options[i].phase2 = &phase2;
- options[i].events_triggered = 0;
- options[i].cc = cc;
- options[i].id = optid++;
- GPR_ASSERT(gpr_thd_new(&id,
- i < producers ? producer_thread : consumer_thread,
- options + i, NULL));
- gpr_event_wait(&options[i].on_started, ten_seconds_time());
- }
-
- /* start phase1: producers will pre-declare all operations they will
- complete */
- gpr_log(GPR_INFO, "start phase 1");
- gpr_event_set(&phase1, (void *)(intptr_t)1);
-
- gpr_log(GPR_INFO, "wait phase 1");
- for (i = 0; i < producers + consumers; i++) {
- GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
- }
- gpr_log(GPR_INFO, "done phase 1");
-
- /* start phase2: operations will complete, and consumers will consume them */
- gpr_log(GPR_INFO, "start phase 2");
- gpr_event_set(&phase2, (void *)(intptr_t)1);
-
- /* in parallel, we shutdown the completion channel - all events should still
- be consumed */
- grpc_completion_queue_shutdown(cc);
-
- /* join all threads */
- gpr_log(GPR_INFO, "wait phase 2");
- for (i = 0; i < producers + consumers; i++) {
- GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
- }
- gpr_log(GPR_INFO, "done phase 2");
-
- /* destroy the completion channel */
- grpc_completion_queue_destroy(cc);
-
- /* verify that everything was produced and consumed */
- for (i = 0; i < producers + consumers; i++) {
- if (i < producers) {
- GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
- } else {
- total_consumed += options[i].events_triggered;
- }
- }
- GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
-
- gpr_free(options);
-}
-
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
@@ -422,11 +204,6 @@ int main(int argc, char **argv) {
test_cq_end_op();
test_pluck();
test_pluck_after_shutdown();
- test_too_many_plucks();
- test_threading(1, 1);
- test_threading(1, 10);
- test_threading(10, 1);
- test_threading(10, 10);
grpc_shutdown();
return 0;
}
diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c
new file mode 100644
index 0000000000..c41ee41e24
--- /dev/null
+++ b/test/core/surface/completion_queue_threading_test.c
@@ -0,0 +1,290 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/surface/completion_queue.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/iomgr/iomgr.h"
+#include "test/core/util/test_config.h"
+
+#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
+
+static void *create_test_tag(void) {
+ static intptr_t i = 0;
+ return (void *)(++i);
+}
+
+/* helper for tests to shutdown correctly and tersely */
+static void shutdown_and_destroy(grpc_completion_queue *cc) {
+ grpc_event ev;
+ grpc_completion_queue_shutdown(cc);
+ ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
+ grpc_completion_queue_destroy(cc);
+}
+
+static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *c) {}
+
+struct thread_state {
+ grpc_completion_queue *cc;
+ void *tag;
+};
+
+static void pluck_one(void *arg) {
+ struct thread_state *state = arg;
+ grpc_completion_queue_pluck(state->cc, state->tag,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+}
+
+static void test_too_many_plucks(void) {
+ grpc_event ev;
+ grpc_completion_queue *cc;
+ void *tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
+ grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
+ gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)];
+ struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
+ gpr_thd_options thread_options = gpr_thd_options_default();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ unsigned i, j;
+
+ LOG_TEST("test_too_many_plucks");
+
+ cc = grpc_completion_queue_create(NULL);
+ gpr_thd_options_set_joinable(&thread_options);
+
+ for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
+ tags[i] = create_test_tag();
+ for (j = 0; j < i; j++) {
+ GPR_ASSERT(tags[i] != tags[j]);
+ }
+ thread_states[i].cc = cc;
+ thread_states[i].tag = tags[i];
+ gpr_thd_new(thread_ids + i, pluck_one, thread_states + i, &thread_options);
+ }
+
+ /* wait until all other threads are plucking */
+ gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(1000));
+
+ ev = grpc_completion_queue_pluck(cc, create_test_tag(),
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
+
+ for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
+ grpc_cq_begin_op(cc, tags[i]);
+ grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE,
+ do_nothing_end_completion, NULL, &completions[i]);
+ }
+
+ for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
+ gpr_thd_join(thread_ids[i]);
+ }
+
+ shutdown_and_destroy(cc);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+#define TEST_THREAD_EVENTS 10000
+
+typedef struct test_thread_options {
+ gpr_event on_started;
+ gpr_event *phase1;
+ gpr_event on_phase1_done;
+ gpr_event *phase2;
+ gpr_event on_finished;
+ size_t events_triggered;
+ int id;
+ grpc_completion_queue *cc;
+} test_thread_options;
+
+gpr_timespec ten_seconds_time(void) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
+}
+
+static void free_completion(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *completion) {
+ gpr_free(completion);
+}
+
+static void producer_thread(void *arg) {
+ test_thread_options *opt = arg;
+ int i;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ gpr_log(GPR_INFO, "producer %d started", opt->id);
+ gpr_event_set(&opt->on_started, (void *)(intptr_t)1);
+ GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
+
+ gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
+ for (i = 0; i < TEST_THREAD_EVENTS; i++) {
+ grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1);
+ }
+
+ gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
+ gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1);
+ GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
+
+ gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
+ for (i = 0; i < TEST_THREAD_EVENTS; i++) {
+ grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(intptr_t)1, GRPC_ERROR_NONE,
+ free_completion, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ opt->events_triggered++;
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
+ gpr_event_set(&opt->on_finished, (void *)(intptr_t)1);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void consumer_thread(void *arg) {
+ test_thread_options *opt = arg;
+ grpc_event ev;
+
+ gpr_log(GPR_INFO, "consumer %d started", opt->id);
+ gpr_event_set(&opt->on_started, (void *)(intptr_t)1);
+ GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
+
+ gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
+
+ gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
+ gpr_event_set(&opt->on_phase1_done, (void *)(intptr_t)1);
+ GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
+
+ gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
+ for (;;) {
+ ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
+ switch (ev.type) {
+ case GRPC_OP_COMPLETE:
+ GPR_ASSERT(ev.success);
+ opt->events_triggered++;
+ break;
+ case GRPC_QUEUE_SHUTDOWN:
+ gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
+ gpr_event_set(&opt->on_finished, (void *)(intptr_t)1);
+ return;
+ case GRPC_QUEUE_TIMEOUT:
+ gpr_log(GPR_ERROR, "Invalid timeout received");
+ abort();
+ }
+ }
+}
+
+static void test_threading(size_t producers, size_t consumers) {
+ test_thread_options *options =
+ gpr_malloc((producers + consumers) * sizeof(test_thread_options));
+ gpr_event phase1 = GPR_EVENT_INIT;
+ gpr_event phase2 = GPR_EVENT_INIT;
+ grpc_completion_queue *cc = grpc_completion_queue_create(NULL);
+ size_t i;
+ size_t total_consumed = 0;
+ static int optid = 101;
+
+ gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
+ "test_threading", producers, consumers);
+
+ /* start all threads: they will wait for phase1 */
+ for (i = 0; i < producers + consumers; i++) {
+ gpr_thd_id id;
+ gpr_event_init(&options[i].on_started);
+ gpr_event_init(&options[i].on_phase1_done);
+ gpr_event_init(&options[i].on_finished);
+ options[i].phase1 = &phase1;
+ options[i].phase2 = &phase2;
+ options[i].events_triggered = 0;
+ options[i].cc = cc;
+ options[i].id = optid++;
+ GPR_ASSERT(gpr_thd_new(&id,
+ i < producers ? producer_thread : consumer_thread,
+ options + i, NULL));
+ gpr_event_wait(&options[i].on_started, ten_seconds_time());
+ }
+
+ /* start phase1: producers will pre-declare all operations they will
+ complete */
+ gpr_log(GPR_INFO, "start phase 1");
+ gpr_event_set(&phase1, (void *)(intptr_t)1);
+
+ gpr_log(GPR_INFO, "wait phase 1");
+ for (i = 0; i < producers + consumers; i++) {
+ GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
+ }
+ gpr_log(GPR_INFO, "done phase 1");
+
+ /* start phase2: operations will complete, and consumers will consume them */
+ gpr_log(GPR_INFO, "start phase 2");
+ gpr_event_set(&phase2, (void *)(intptr_t)1);
+
+ /* in parallel, we shutdown the completion channel - all events should still
+ be consumed */
+ grpc_completion_queue_shutdown(cc);
+
+ /* join all threads */
+ gpr_log(GPR_INFO, "wait phase 2");
+ for (i = 0; i < producers + consumers; i++) {
+ GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
+ }
+ gpr_log(GPR_INFO, "done phase 2");
+
+ /* destroy the completion channel */
+ grpc_completion_queue_destroy(cc);
+
+ /* verify that everything was produced and consumed */
+ for (i = 0; i < producers + consumers; i++) {
+ if (i < producers) {
+ GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
+ } else {
+ total_consumed += options[i].events_triggered;
+ }
+ }
+ GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
+
+ gpr_free(options);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+ test_too_many_plucks();
+ test_threading(1, 1);
+ test_threading(1, 10);
+ test_threading(10, 1);
+ test_threading(10, 10);
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index 04793bceab..29ccbd270d 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -31,6 +31,12 @@
*
*/
+/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+ using that endpoint. Because of various transitive includes in uv.h,
+ including windows.h on Windows, uv.h must be included before other system
+ headers. Therefore, sockaddr.h must always be included first */
+#include "src/core/lib/iomgr/sockaddr.h"
+
#include "test/core/util/mock_endpoint.h"
#include <inttypes.h>
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index d42ec7f9e8..4a1447554b 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -31,6 +31,12 @@
*
*/
+/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+ using that endpoint. Because of various transitive includes in uv.h,
+ including windows.h on Windows, uv.h must be included before other system
+ headers. Therefore, sockaddr.h must always be included first */
+#include "src/core/lib/iomgr/sockaddr.h"
+
#include "test/core/util/passthru_endpoint.h"
#include <inttypes.h>
diff --git a/test/core/util/test_config.c b/test/core/util/test_config.c
index d86ed94637..986bbc9569 100644
--- a/test/core/util/test_config.c
+++ b/test/core/util/test_config.c
@@ -215,6 +215,19 @@ static void install_crash_handler() {
#include <stdio.h>
#include <string.h>
+#define SIGNAL_NAMES_LENGTH 32
+
+static const char * const signal_names[] = {
+ NULL,
+ "SIGHUP", "SIGINT", "SIGQUIT", "SIGILL", "SIGTRAP",
+ "SIGABRT", "SIGBUS", "SIGFPE", "SIGKILL", "SIGUSR1",
+ "SIGSEGV", "SIGUSR2", "SIGPIPE", "SIGALRM", "SIGTERM",
+ "SIGSTKFLT", "SIGCHLD", "SIGCONT", "SIGSTOP", "SIGTSTP",
+ "SIGTTIN", "SIGTTOU", "SIGURG", "SIGXCPU", "SIGXFSZ",
+ "SIGVTALRM", "SIGPROF", "SIGWINCH", "SIGIO", "SIGPWR",
+ "SIGSYS"
+};
+
static char g_alt_stack[GPR_MAX(MINSIGSTKSZ, 65536)];
#define MAX_FRAMES 32
@@ -240,7 +253,11 @@ static void crash_handler(int signum, siginfo_t *info, void *data) {
int addrlen;
output_string("\n\n\n*******************************\nCaught signal ");
- output_num(signum);
+ if (signum > 0 && signum < SIGNAL_NAMES_LENGTH) {
+ output_string(signal_names[signum]);
+ } else {
+ output_num(signum);
+ }
output_string("\n");
addrlen = backtrace(addrlist, GPR_ARRAY_SIZE(addrlist));