aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-06-27 12:38:39 -0700
committerGravatar GitHub <noreply@github.com>2016-06-27 12:38:39 -0700
commita9df5d1029b303a305481c4bb765c6bed2e8ebbb (patch)
tree9a604cac3e9e98a64747ad578c6a8651d264d34c
parentb4028f6675a3c3d24fdbb3bed4e4a3939ccbb434 (diff)
parentf467846b1b6910c2fe6cf5c9802baafd3deb5186 (diff)
Merge pull request #6644 from ctiller/reuse_port
SO_REUSEPORT support
-rw-r--r--include/grpc/impl/codegen/grpc_types.h2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c3
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c22
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_server.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c123
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c1
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c10
-rw-r--r--test/core/surface/concurrent_connectivity_test.c2
-rw-r--r--test/core/surface/server_chttp2_test.c8
-rw-r--r--test/core/surface/server_test.c10
-rw-r--r--test/core/util/test_tcp_server.c4
-rw-r--r--test/cpp/end2end/server_builder_plugin_test.cc17
14 files changed, 181 insertions, 30 deletions
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index c51ffa4493..c0ed139506 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -156,6 +156,8 @@ typedef struct {
#define GRPC_SSL_TARGET_NAME_OVERRIDE_ARG "grpc.ssl_target_name_override"
/* Maximum metadata size */
#define GRPC_ARG_MAX_METADATA_SIZE "grpc.max_metadata_size"
+/** If non-zero, allow the use of SO_REUSEPORT if it's available (default 1) */
+#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
/** Result of a grpc call. If the caller satisfies the prerequisites of a
particular operation, the grpc_call_error returned will be GRPC_CALL_OK.
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9bae3a94f9..e5c987925c 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -97,7 +97,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err = grpc_tcp_server_create(NULL, &tcp);
+ err =
+ grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ead8a4d566..c42810e913 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -216,7 +216,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
- err = grpc_tcp_server_create(&state->destroy_closure, &tcp);
+ err = grpc_tcp_server_create(&state->destroy_closure,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index 3a1371617e..d2f6261e2a 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -169,6 +169,28 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) {
return GRPC_ERROR_NONE;
}
+/* set a socket to reuse old addresses */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) {
+#ifndef SO_REUSEPORT
+ return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system");
+#else
+ int val = (reuse != 0);
+ int newval;
+ socklen_t intlen = sizeof(newval);
+ if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)");
+ }
+ if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)");
+ }
+ if ((newval != 0) != val) {
+ return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT");
+ }
+
+ return GRPC_ERROR_NONE;
+#endif
+}
+
/* disable nagle */
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
int val = (low_latency != 0);
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 30ff39dfa3..7bcc2219ae 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -55,6 +55,9 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse);
/* disable nagle */
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency);
+/* set SO_REUSEPORT */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse);
+
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 1d6e70dbe9..875a6ca14a 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -34,6 +34,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
+#include <grpc/grpc.h>
+
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@@ -59,6 +61,7 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server);
/* Start listening to bound ports */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 6fb547bb36..a1a463550a 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -112,8 +112,10 @@ struct grpc_tcp_server {
/* destroyed port count: how many ports are completely destroyed */
size_t destroyed_ports;
- /* is this server shutting down? (boolean) */
- int shutdown;
+ /* is this server shutting down? */
+ bool shutdown;
+ /* use SO_REUSEPORT */
+ bool so_reuseport;
/* linked list of server ports */
grpc_tcp_listener *head;
@@ -135,14 +137,42 @@ struct grpc_tcp_server {
size_t next_pollset_to_assign;
};
+static gpr_once check_init = GPR_ONCE_INIT;
+static bool has_so_reuseport;
+
+static void init(void) {
+ int s = socket(AF_INET, SOCK_STREAM, 0);
+ if (s >= 0) {
+ has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT",
+ grpc_set_socket_reuse_port(s, 1));
+ close(s);
+ }
+}
+
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server) {
+ gpr_once_init(&check_init, init);
+
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ s->so_reuseport = has_so_reuseport;
+ for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
+ if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_INTEGER) {
+ s->so_reuseport =
+ has_so_reuseport && (args->args[i].value.integer != 0);
+ } else {
+ gpr_free(s);
+ return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
+ " must be an integer");
+ }
+ }
+ }
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
- s->shutdown = 0;
+ s->shutdown = false;
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
@@ -218,7 +248,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
- s->shutdown = 1;
+ s->shutdown = true;
/* shutdown all fd's */
if (s->active_ports) {
@@ -268,13 +298,19 @@ static int get_max_accept_queue_size(void) {
/* Prepare a recently-created socket for listening. */
static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
- size_t addr_len, int *port) {
+ size_t addr_len, bool so_reuseport,
+ int *port) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
+ if (so_reuseport) {
+ err = grpc_set_socket_reuse_port(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+
err = grpc_set_socket_nonblocking(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
err = grpc_set_socket_cloexec(fd, 1);
@@ -407,7 +443,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
char *addr_str;
char *name;
- grpc_error *err = prepare_socket(fd, addr, addr_len, &port);
+ grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port);
if (err == GRPC_ERROR_NONE) {
GPR_ASSERT(port > 0);
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
@@ -443,6 +479,52 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
return err;
}
+static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
+ grpc_tcp_listener *sp = NULL;
+ char *addr_str;
+ char *name;
+ grpc_error *err;
+
+ for (grpc_tcp_listener *l = listener->next; l && l->is_sibling; l = l->next) {
+ l->fd_index += count;
+ }
+
+ for (unsigned i = 0; i < count; i++) {
+ int fd, port;
+ grpc_dualstack_mode dsmode;
+ err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
+ &dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) return err;
+ err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true,
+ &port);
+ if (err != GRPC_ERROR_NONE) return err;
+ listener->server->nports++;
+ grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = listener->next;
+ listener->next = sp;
+ sp->server = listener->server;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len);
+ sp->addr_len = listener->addr_len;
+ sp->port = port;
+ sp->port_index = listener->port_index;
+ sp->fd_index = listener->fd_index + count - i;
+ sp->is_sibling = 1;
+ sp->sibling = listener->is_sibling ? listener->sibling : listener;
+ GPR_ASSERT(sp->emfd);
+ while (listener->server->tail->next != NULL) {
+ listener->server->tail = listener->server->tail->next;
+ }
+ gpr_free(addr_str);
+ gpr_free(name);
+ }
+
+ return GRPC_ERROR_NONE;
+}
+
grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len, int *out_port) {
grpc_tcp_listener *sp;
@@ -599,14 +681,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets;
s->pollset_count = pollset_count;
- for (sp = s->head; sp; sp = sp->next) {
- for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ sp = s->head;
+ while (sp != NULL) {
+ if (s->so_reuseport && pollset_count > 1) {
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
+ for (i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ sp->read_closure.cb = on_read;
+ sp->read_closure.cb_arg = sp;
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ s->active_ports++;
+ sp = sp->next;
+ }
+ } else {
+ for (i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ }
+ sp->read_closure.cb = on_read;
+ sp->read_closure.cb_arg = sp;
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ s->active_ports++;
+ sp = sp->next;
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
- s->active_ports++;
}
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 86982bc183..7b0966704c 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -103,6 +103,7 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index eda774fca5..6e2d1d0fc9 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -129,7 +129,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
static void test_no_op(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -137,7 +137,7 @@ static void test_no_op(void) {
static void test_no_op_with_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
LOG_TEST("test_no_op_with_start");
grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
grpc_tcp_server_unref(&exec_ctx, s);
@@ -148,7 +148,7 @@ static void test_no_op_with_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_tcp_server *s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
LOG_TEST("test_no_op_with_port");
memset(&addr, 0, sizeof(addr));
@@ -166,7 +166,7 @@ static void test_no_op_with_port_and_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_tcp_server *s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
LOG_TEST("test_no_op_with_port_and_start");
int port;
@@ -226,7 +226,7 @@ static void test_connect(unsigned n) {
unsigned svr1_fd_count;
int svr1_port;
grpc_tcp_server *s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
unsigned i;
server_weak_ref weak_ref;
server_weak_ref_init(&weak_ref);
diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c
index f447a68887..f7567f350d 100644
--- a/test/core/surface/concurrent_connectivity_test.c
+++ b/test/core/surface/concurrent_connectivity_test.c
@@ -113,7 +113,7 @@ void bad_server_thread(void *vargs) {
socklen_t addr_len = sizeof(addr);
int port;
grpc_tcp_server *s;
- grpc_error *error = grpc_tcp_server_create(NULL, &s);
+ grpc_error *error = grpc_tcp_server_create(NULL, NULL, &s);
GPR_ASSERT(error == GRPC_ERROR_NONE);
memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET;
diff --git a/test/core/surface/server_chttp2_test.c b/test/core/surface/server_chttp2_test.c
index f42ca9f9cd..6310b6f00b 100644
--- a/test/core/surface/server_chttp2_test.c
+++ b/test/core/surface/server_chttp2_test.c
@@ -49,10 +49,16 @@ void test_unparsable_target(void) {
}
void test_add_same_port_twice() {
+ grpc_arg a;
+ a.type = GRPC_ARG_INTEGER;
+ a.key = GRPC_ARG_ALLOW_REUSEPORT;
+ a.value.integer = 0;
+ grpc_channel_args args = {1, &a};
+
int port = grpc_pick_unused_port_or_die();
char *addr = NULL;
grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
- grpc_server *server = grpc_server_create(NULL, NULL);
+ grpc_server *server = grpc_server_create(&args, NULL);
grpc_server_credentials *fake_creds =
grpc_fake_transport_security_server_credentials_create();
gpr_join_host_port(&addr, "localhost", port);
diff --git a/test/core/surface/server_test.c b/test/core/surface/server_test.c
index 02eb432e2d..6dd8a435aa 100644
--- a/test/core/surface/server_test.c
+++ b/test/core/surface/server_test.c
@@ -82,9 +82,15 @@ void test_request_call_on_no_server_cq(void) {
}
void test_bind_server_twice(void) {
+ grpc_arg a;
+ a.type = GRPC_ARG_INTEGER;
+ a.key = GRPC_ARG_ALLOW_REUSEPORT;
+ a.value.integer = 0;
+ grpc_channel_args args = {1, &a};
+
char *addr;
- grpc_server *server1 = grpc_server_create(NULL, NULL);
- grpc_server *server2 = grpc_server_create(NULL, NULL);
+ grpc_server *server1 = grpc_server_create(&args, NULL);
+ grpc_server *server2 = grpc_server_create(&args, NULL);
grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
int port = grpc_pick_unused_port_or_die();
gpr_asprintf(&addr, "[::]:%d", port);
diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c
index 27c16fc764..8a0b3932d8 100644
--- a/test/core/util/test_tcp_server.c
+++ b/test/core/util/test_tcp_server.c
@@ -72,8 +72,8 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
addr.sin_port = htons((uint16_t)port);
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
- grpc_error *error =
- grpc_tcp_server_create(&server->shutdown_complete, &server->tcp_server);
+ grpc_error *error = grpc_tcp_server_create(&server->shutdown_complete, NULL,
+ &server->tcp_server);
GPR_ASSERT(error == GRPC_ERROR_NONE);
error = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr),
&port_added);
diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc
index 75f23b64a7..778a2be573 100644
--- a/test/cpp/end2end/server_builder_plugin_test.cc
+++ b/test/cpp/end2end/server_builder_plugin_test.cc
@@ -37,6 +37,7 @@
#include <grpc++/impl/server_builder_option.h>
#include <grpc++/impl/server_builder_plugin.h>
#include <grpc++/impl/server_initializer.h>
+#include <grpc++/impl/thd.h>
#include <grpc++/security/credentials.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@@ -187,7 +188,10 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
void StartServer() {
grpc::string server_address = "localhost:" + to_string(port_);
builder_->AddListeningPort(server_address, InsecureServerCredentials());
+ // we run some tests without a service, and for those we need to supply a
+ // frequently polled completion queue
cq_ = builder_->AddCompletionQueue();
+ cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this));
server_ = builder_->BuildAndStart();
EXPECT_TRUE(CheckPresent());
}
@@ -204,11 +208,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
EXPECT_TRUE(plugin->init_server_is_called());
EXPECT_TRUE(plugin->finish_is_called());
server_->Shutdown();
- void* tag;
- bool ok;
cq_->Shutdown();
- while (cq_->Next(&tag, &ok))
- ;
+ cq_thread_.join();
}
string to_string(const int number) {
@@ -223,6 +224,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<Server> server_;
+ grpc::thread cq_thread_;
TestServiceImpl service_;
int port_;
@@ -238,6 +240,13 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
return nullptr;
}
}
+
+ void RunCQ() {
+ void* tag;
+ bool ok;
+ while (cq_->Next(&tag, &ok))
+ ;
+ }
};
TEST_P(ServerBuilderPluginTest, PluginWithoutServiceTest) {