aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-06-27 14:01:25 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-06-27 14:01:25 -0700
commit8551a709a1fb62a2e748d230b52b530df55b24eb (patch)
treee5f221af5a7d644c934edd3fccbc487d08f31025 /src
parenta6e022f98f157ff938fbfb90b5666eab83579cd2 (diff)
parenta9df5d1029b303a305481c4bb765c6bed2e8ebbb (diff)
Merge github.com:grpc/grpc into %s
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c3
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c121
-rw-r--r--src/core/lib/iomgr/network_status_tracker.h41
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c22
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_posix.c4
-rw-r--r--src/core/lib/iomgr/tcp_server.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c123
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c1
-rw-r--r--src/core/lib/iomgr/tcp_windows.c5
-rw-r--r--src/core/lib/iomgr/udp_server.c4
-rw-r--r--src/node/ext/node_grpc.cc4
-rw-r--r--src/python/grpcio/grpc/_channel.py65
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py38
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio/tests/tests.json1
-rw-r--r--src/python/grpcio/tests/unit/_exit_scenarios.py249
-rw-r--r--src/python/grpcio/tests/unit/_exit_test.py185
19 files changed, 820 insertions, 56 deletions
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9bae3a94f9..e5c987925c 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -97,7 +97,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err = grpc_tcp_server_create(NULL, &tcp);
+ err =
+ grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ead8a4d566..c42810e913 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -216,7 +216,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
- err = grpc_tcp_server_create(&state->destroy_closure, &tcp);
+ err = grpc_tcp_server_create(&state->destroy_closure,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
new file mode 100644
index 0000000000..38a1c9b7d4
--- /dev/null
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include "src/core/lib/iomgr/endpoint.h"
+
+typedef struct endpoint_ll_node {
+ grpc_endpoint *ep;
+ struct endpoint_ll_node *next;
+} endpoint_ll_node;
+
+static endpoint_ll_node *head = NULL;
+static gpr_mu g_endpoint_mutex;
+static bool g_init_done = false;
+
+void grpc_initialize_network_status_monitor() {
+ g_init_done = true;
+ gpr_mu_init(&g_endpoint_mutex);
+ // TODO(makarandd): Install callback with OS to monitor network status.
+}
+
+void grpc_destroy_network_status_monitor() {
+ for (endpoint_ll_node *curr = head; curr != NULL;) {
+ endpoint_ll_node *next = curr->next;
+ gpr_free(curr);
+ curr = next;
+ }
+ gpr_mu_destroy(&g_endpoint_mutex);
+}
+
+void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
+ if (!g_init_done) {
+ grpc_initialize_network_status_monitor();
+ }
+ gpr_mu_lock(&g_endpoint_mutex);
+ if (head == NULL) {
+ head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
+ head->ep = ep;
+ head->next = NULL;
+ } else {
+ endpoint_ll_node *prev_head = head;
+ head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
+ head->ep = ep;
+ head->next = prev_head;
+ }
+ gpr_mu_unlock(&g_endpoint_mutex);
+}
+
+void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) {
+ gpr_mu_lock(&g_endpoint_mutex);
+ GPR_ASSERT(head);
+ bool found = false;
+ endpoint_ll_node *prev = head;
+ // if we're unregistering the head, just move head to the next
+ if (ep == head->ep) {
+ head = head->next;
+ gpr_free(prev);
+ found = true;
+ } else {
+ for (endpoint_ll_node *curr = head->next; curr != NULL; curr = curr->next) {
+ if (ep == curr->ep) {
+ prev->next = curr->next;
+ gpr_free(curr);
+ found = true;
+ break;
+ }
+ prev = curr;
+ }
+ }
+ gpr_mu_unlock(&g_endpoint_mutex);
+ GPR_ASSERT(found);
+}
+
+// Walk the linked-list from head and execute shutdown. It is possible that
+// other threads might be in the process of shutdown as well, but that has
+// no side effect since endpoint shutdown is idempotent.
+void grpc_network_status_shutdown_all_endpoints() {
+ gpr_mu_lock(&g_endpoint_mutex);
+ if (head == NULL) {
+ gpr_mu_unlock(&g_endpoint_mutex);
+ return;
+ }
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
+ curr->ep->vtable->shutdown(&exec_ctx, curr->ep);
+ }
+ gpr_mu_unlock(&g_endpoint_mutex);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h
new file mode 100644
index 0000000000..74a1aa8135
--- /dev/null
+++ b/src/core/lib/iomgr/network_status_tracker.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
+#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
+#include "src/core/lib/iomgr/endpoint.h"
+
+void grpc_network_status_register_endpoint(grpc_endpoint *ep);
+void grpc_network_status_unregister_endpoint(grpc_endpoint *ep);
+void grpc_network_status_shutdown_all_endpoints();
+#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index 3a1371617e..d2f6261e2a 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -169,6 +169,28 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) {
return GRPC_ERROR_NONE;
}
+/* set a socket to reuse old addresses */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) {
+#ifndef SO_REUSEPORT
+ return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system");
+#else
+ int val = (reuse != 0);
+ int newval;
+ socklen_t intlen = sizeof(newval);
+ if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
+ return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)");
+ }
+ if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
+ return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)");
+ }
+ if ((newval != 0) != val) {
+ return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT");
+ }
+
+ return GRPC_ERROR_NONE;
+#endif
+}
+
/* disable nagle */
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
int val = (low_latency != 0);
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 30ff39dfa3..7bcc2219ae 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -55,6 +55,9 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse);
/* disable nagle */
grpc_error *grpc_set_socket_low_latency(int fd, int low_latency);
+/* set SO_REUSEPORT */
+grpc_error *grpc_set_socket_reuse_port(int fd, int reuse);
+
/* Returns true if this system can create AF_INET6 sockets bound to ::1.
The value is probed once, and cached for the life of the process.
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 1046b60bcc..2ab45e33ce 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -35,6 +35,7 @@
#ifdef GPR_POSIX_SOCKET
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include <errno.h>
@@ -152,6 +153,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -474,6 +476,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
+ /* Tell network status tracker about new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 1d6e70dbe9..875a6ca14a 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -34,6 +34,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_H
+#include <grpc/grpc.h>
+
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@@ -59,6 +61,7 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server);
/* Start listening to bound ports */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 6fb547bb36..a1a463550a 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -112,8 +112,10 @@ struct grpc_tcp_server {
/* destroyed port count: how many ports are completely destroyed */
size_t destroyed_ports;
- /* is this server shutting down? (boolean) */
- int shutdown;
+ /* is this server shutting down? */
+ bool shutdown;
+ /* use SO_REUSEPORT */
+ bool so_reuseport;
/* linked list of server ports */
grpc_tcp_listener *head;
@@ -135,14 +137,42 @@ struct grpc_tcp_server {
size_t next_pollset_to_assign;
};
+static gpr_once check_init = GPR_ONCE_INIT;
+static bool has_so_reuseport;
+
+static void init(void) {
+ int s = socket(AF_INET, SOCK_STREAM, 0);
+ if (s >= 0) {
+ has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT",
+ grpc_set_socket_reuse_port(s, 1));
+ close(s);
+ }
+}
+
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server) {
+ gpr_once_init(&check_init, init);
+
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ s->so_reuseport = has_so_reuseport;
+ for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
+ if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_INTEGER) {
+ s->so_reuseport =
+ has_so_reuseport && (args->args[i].value.integer != 0);
+ } else {
+ gpr_free(s);
+ return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
+ " must be an integer");
+ }
+ }
+ }
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
- s->shutdown = 0;
+ s->shutdown = false;
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
@@ -218,7 +248,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
- s->shutdown = 1;
+ s->shutdown = true;
/* shutdown all fd's */
if (s->active_ports) {
@@ -268,13 +298,19 @@ static int get_max_accept_queue_size(void) {
/* Prepare a recently-created socket for listening. */
static grpc_error *prepare_socket(int fd, const struct sockaddr *addr,
- size_t addr_len, int *port) {
+ size_t addr_len, bool so_reuseport,
+ int *port) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
+ if (so_reuseport) {
+ err = grpc_set_socket_reuse_port(fd, 1);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+
err = grpc_set_socket_nonblocking(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
err = grpc_set_socket_cloexec(fd, 1);
@@ -407,7 +443,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
char *addr_str;
char *name;
- grpc_error *err = prepare_socket(fd, addr, addr_len, &port);
+ grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port);
if (err == GRPC_ERROR_NONE) {
GPR_ASSERT(port > 0);
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
@@ -443,6 +479,52 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd,
return err;
}
+static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
+ grpc_tcp_listener *sp = NULL;
+ char *addr_str;
+ char *name;
+ grpc_error *err;
+
+ for (grpc_tcp_listener *l = listener->next; l && l->is_sibling; l = l->next) {
+ l->fd_index += count;
+ }
+
+ for (unsigned i = 0; i < count; i++) {
+ int fd, port;
+ grpc_dualstack_mode dsmode;
+ err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
+ &dsmode, &fd);
+ if (err != GRPC_ERROR_NONE) return err;
+ err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true,
+ &port);
+ if (err != GRPC_ERROR_NONE) return err;
+ listener->server->nports++;
+ grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = listener->next;
+ listener->next = sp;
+ sp->server = listener->server;
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd, name);
+ memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len);
+ sp->addr_len = listener->addr_len;
+ sp->port = port;
+ sp->port_index = listener->port_index;
+ sp->fd_index = listener->fd_index + count - i;
+ sp->is_sibling = 1;
+ sp->sibling = listener->is_sibling ? listener->sibling : listener;
+ GPR_ASSERT(sp->emfd);
+ while (listener->server->tail->next != NULL) {
+ listener->server->tail = listener->server->tail->next;
+ }
+ gpr_free(addr_str);
+ gpr_free(name);
+ }
+
+ return GRPC_ERROR_NONE;
+}
+
grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len, int *out_port) {
grpc_tcp_listener *sp;
@@ -599,14 +681,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets;
s->pollset_count = pollset_count;
- for (sp = s->head; sp; sp = sp->next) {
- for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ sp = s->head;
+ while (sp != NULL) {
+ if (s->so_reuseport && pollset_count > 1) {
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
+ for (i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ sp->read_closure.cb = on_read;
+ sp->read_closure.cb_arg = sp;
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ s->active_ports++;
+ sp = sp->next;
+ }
+ } else {
+ for (i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
+ }
+ sp->read_closure.cb = on_read;
+ sp->read_closure.cb_arg = sp;
+ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+ s->active_ports++;
+ sp = sp->next;
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
- grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
- s->active_ports++;
}
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 86982bc183..7b0966704c 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -103,6 +103,7 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index b2af8030aa..37ab59021e 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -37,6 +37,7 @@
#include <limits.h>
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include <grpc/support/alloc.h>
@@ -378,6 +379,7 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
}
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
TCP_UNREF(tcp, "destroy");
}
@@ -401,6 +403,9 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
+ /* Tell network status tracking code about the new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
+
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 16150687d3..1ebccf2ee2 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -243,13 +243,13 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) {
gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
- buf_size_bytes);
+ buffer_size_bytes);
goto error;
}
if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) {
gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes",
- buf_size_bytes);
+ buffer_size_bytes);
goto error;
}
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index ce988f9dfa..745b5023d5 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -265,8 +265,8 @@ void InitLogConstants(Local<Object> exports) {
Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG);
Local<Value> INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO));
Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), INFO);
- Local<Value> ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR));
- Nan::Set(log_verbosity, Nan::New("ERROR").ToLocalChecked(), ERROR);
+ Local<Value> LOG_ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR));
+ Nan::Set(log_verbosity, Nan::New("ERROR").ToLocalChecked(), LOG_ERROR);
}
NAN_METHOD(MetadataKeyIsLegal) {
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 1f34beeb2c..cf6175d031 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -179,6 +179,7 @@ def _event_handler(state, call, response_deserializer):
def _consume_request_iterator(
request_iterator, state, call, request_serializer):
event_handler = _event_handler(state, call, None)
+
def consume_request_iterator():
for request in request_iterator:
serialized_request = _common.serialize(request, request_serializer)
@@ -212,8 +213,18 @@ def _consume_request_iterator(
)
call.start_batch(cygrpc.Operations(operations), event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
- thread = threading.Thread(target=consume_request_iterator)
- thread.start()
+
+ def stop_consumption_thread(timeout):
+ with state.condition:
+ if state.code is None:
+ call.cancel()
+ state.cancelled = True
+ _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!')
+ state.condition.notify_all()
+
+ consumption_thread = _common.CleanupThread(
+ stop_consumption_thread, target=consume_request_iterator)
+ consumption_thread.start()
class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
@@ -652,16 +663,27 @@ class _ChannelCallState(object):
self.managed_calls = None
-def _call_spin(state):
- while True:
- event = state.completion_queue.poll()
- completed_call = event.tag(event)
- if completed_call is not None:
- with state.lock:
- state.managed_calls.remove(completed_call)
- if not state.managed_calls:
- state.managed_calls = None
- return
+def _run_channel_spin_thread(state):
+ def channel_spin():
+ while True:
+ event = state.completion_queue.poll()
+ completed_call = event.tag(event)
+ if completed_call is not None:
+ with state.lock:
+ state.managed_calls.remove(completed_call)
+ if not state.managed_calls:
+ state.managed_calls = None
+ return
+
+ def stop_channel_spin(timeout):
+ with state.lock:
+ if state.managed_calls is not None:
+ for call in state.managed_calls:
+ call.cancel()
+
+ channel_spin_thread = _common.CleanupThread(
+ stop_channel_spin, target=channel_spin)
+ channel_spin_thread.start()
def _create_channel_managed_call(state):
@@ -690,8 +712,7 @@ def _create_channel_managed_call(state):
parent, flags, state.completion_queue, method, host, deadline)
if state.managed_calls is None:
state.managed_calls = set((call,))
- spin_thread = threading.Thread(target=_call_spin, args=(state,))
- spin_thread.start()
+ _run_channel_spin_thread(state)
else:
state.managed_calls.add(call)
return call
@@ -784,11 +805,18 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
_spawn_delivery(state, callbacks)
+def _moot(state):
+ with state.lock:
+ del state.callbacks_and_connectivities[:]
+
+
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
- polling_thread = threading.Thread(
- target=_poll_connectivity,
+ def cancel_all_subscriptions(timeout):
+ _moot(state)
+ polling_thread = _common.CleanupThread(
+ cancel_all_subscriptions, target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
polling_thread.start()
state.polling = True
@@ -812,11 +840,6 @@ def _unsubscribe(state, callback):
break
-def _moot(state):
- with state.lock:
- del state.callbacks_and_connectivities[:]
-
-
def _options(options):
if options is None:
pairs = ((cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT),)
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
index 79e6ca87eb..c695434dac 100644
--- a/src/python/grpcio/grpc/beta/_server_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -161,14 +161,24 @@ class _Callback(stream.Consumer):
self._condition.wait()
-def _pipe_requests(request_iterator, request_consumer, servicer_context):
- for request in request_iterator:
- if not servicer_context.is_active():
- return
- request_consumer.consume(request)
- if not servicer_context.is_active():
- return
- request_consumer.terminate()
+def _run_request_pipe_thread(request_iterator, request_consumer,
+ servicer_context):
+ thread_joined = threading.Event()
+ def pipe_requests():
+ for request in request_iterator:
+ if not servicer_context.is_active() or thread_joined.is_set():
+ return
+ request_consumer.consume(request)
+ if not servicer_context.is_active() or thread_joined.is_set():
+ return
+ request_consumer.terminate()
+
+ def stop_request_pipe(timeout):
+ thread_joined.set()
+
+ request_pipe_thread = _common.CleanupThread(
+ stop_request_pipe, target=pipe_requests)
+ request_pipe_thread.start()
def _adapt_unary_unary_event(unary_unary_event):
@@ -206,10 +216,8 @@ def _adapt_stream_unary_event(stream_unary_event):
raise abandonment.Abandoned()
request_consumer = stream_unary_event(
callback.consume_and_terminate, _FaceServicerContext(servicer_context))
- request_pipe_thread = threading.Thread(
- target=_pipe_requests,
- args=(request_iterator, request_consumer, servicer_context,))
- request_pipe_thread.start()
+ _run_request_pipe_thread(
+ request_iterator, request_consumer, servicer_context)
return callback.draw_all_values()[0]
return adaptation
@@ -221,10 +229,8 @@ def _adapt_stream_stream_event(stream_stream_event):
raise abandonment.Abandoned()
request_consumer = stream_stream_event(
callback, _FaceServicerContext(servicer_context))
- request_pipe_thread = threading.Thread(
- target=_pipe_requests,
- args=(request_iterator, request_consumer, servicer_context,))
- request_pipe_thread.start()
+ _run_request_pipe_thread(
+ request_iterator, request_consumer, servicer_context)
while True:
response = callback.draw_one_value()
if response is None:
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index ea4d17351c..b37e27c27e 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -105,6 +105,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/iomgr_posix.c',
'src/core/lib/iomgr/iomgr_windows.c',
'src/core/lib/iomgr/load_file.c',
+ 'src/core/lib/iomgr/network_status_tracker.c',
'src/core/lib/iomgr/polling_entity.c',
'src/core/lib/iomgr/pollset_set_windows.c',
'src/core/lib/iomgr/pollset_windows.c',
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 128f85b8b0..9c118ef601 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -14,6 +14,7 @@
"_connectivity_channel_test.ChannelConnectivityTest",
"_connectivity_channel_test.ConnectivityStatesTest",
"_empty_message_test.EmptyMessageTest",
+ "_exit_test.ExitTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
diff --git a/src/python/grpcio/tests/unit/_exit_scenarios.py b/src/python/grpcio/tests/unit/_exit_scenarios.py
new file mode 100644
index 0000000000..24a2faef85
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_exit_scenarios.py
@@ -0,0 +1,249 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines a number of module-scope gRPC scenarios to test clean exit."""
+
+import argparse
+import threading
+import time
+
+import grpc
+
+from tests.unit.framework.common import test_constants
+
+WAIT_TIME = 1000
+
+REQUEST = b'request'
+
+UNSTARTED_SERVER = 'unstarted_server'
+RUNNING_SERVER = 'running_server'
+POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
+POLL_CONNECTIVITY = 'poll_connectivity'
+IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
+IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
+IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
+IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
+IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
+IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
+IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
+
+UNARY_UNARY = b'/test/UnaryUnary'
+UNARY_STREAM = b'/test/UnaryStream'
+STREAM_UNARY = b'/test/StreamUnary'
+STREAM_STREAM = b'/test/StreamStream'
+PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
+PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
+PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
+
+TEST_TO_METHOD = {
+ IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
+ IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
+ IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
+ IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
+ IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
+ IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
+ IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
+}
+
+
+def hang_unary_unary(request, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_unary_stream(request, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_unary_stream(request, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ yield request
+ time.sleep(WAIT_TIME)
+
+
+def hang_stream_unary(request_iterator, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_stream_unary(request_iterator, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ next(request_iterator)
+ time.sleep(WAIT_TIME)
+
+
+def hang_stream_stream(request_iterator, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_stream_stream(request_iterator, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ yield next(request_iterator)
+ time.sleep(WAIT_TIME)
+
+
+class MethodHandler(grpc.RpcMethodHandler):
+
+ def __init__(self, request_streaming, response_streaming, partial_hang):
+ self.request_streaming = request_streaming
+ self.response_streaming = response_streaming
+ self.request_deserializer = None
+ self.response_serializer = None
+ self.unary_unary = None
+ self.unary_stream = None
+ self.stream_unary = None
+ self.stream_stream = None
+ if self.request_streaming and self.response_streaming:
+ if partial_hang:
+ self.stream_stream = hang_partial_stream_stream
+ else:
+ self.stream_stream = hang_stream_stream
+ elif self.request_streaming:
+ if partial_hang:
+ self.stream_unary = hang_partial_stream_unary
+ else:
+ self.stream_unary = hang_stream_unary
+ elif self.response_streaming:
+ if partial_hang:
+ self.unary_stream = hang_partial_unary_stream
+ else:
+ self.unary_stream = hang_unary_stream
+ else:
+ self.unary_unary = hang_unary_unary
+
+
+class GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == UNARY_UNARY:
+ return MethodHandler(False, False, False)
+ elif handler_call_details.method == UNARY_STREAM:
+ return MethodHandler(False, True, False)
+ elif handler_call_details.method == STREAM_UNARY:
+ return MethodHandler(True, False, False)
+ elif handler_call_details.method == STREAM_STREAM:
+ return MethodHandler(True, True, False)
+ elif handler_call_details.method == PARTIAL_UNARY_STREAM:
+ return MethodHandler(False, True, True)
+ elif handler_call_details.method == PARTIAL_STREAM_UNARY:
+ return MethodHandler(True, False, True)
+ elif handler_call_details.method == PARTIAL_STREAM_STREAM:
+ return MethodHandler(True, True, True)
+ else:
+ return None
+
+
+# Traditional executors will not exit until all their
+# current jobs complete. Because we submit jobs that will
+# never finish, we don't want to block exit on these jobs.
+class DaemonPool(object):
+
+ def submit(self, fn, *args, **kwargs):
+ thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
+ thread.daemon = True
+ thread.start()
+
+ def shutdown(self, wait=True):
+ pass
+
+
+def infinite_request_iterator():
+ while True:
+ yield REQUEST
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('scenario', type=str)
+ parser.add_argument(
+ '--wait_for_interrupt', dest='wait_for_interrupt', action='store_true')
+ args = parser.parse_args()
+
+ if args.scenario == UNSTARTED_SERVER:
+ server = grpc.server((), DaemonPool())
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == RUNNING_SERVER:
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
+ channel = grpc.insecure_channel('localhost:12345')
+
+ def connectivity_callback(connectivity):
+ pass
+
+ channel.subscribe(connectivity_callback, try_to_connect=True)
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == POLL_CONNECTIVITY:
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+
+ def connectivity_callback(connectivity):
+ pass
+
+ channel.subscribe(connectivity_callback, try_to_connect=True)
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+
+ else:
+ handler = GenericHandler()
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.add_generic_rpc_handlers((handler,))
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+
+ method = TEST_TO_METHOD[args.scenario]
+
+ if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
+ multi_callable = channel.unary_unary(method)
+ future = multi_callable.future(REQUEST)
+ result, call = multi_callable.with_call(REQUEST)
+ elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
+ multi_callable = channel.unary_stream(method)
+ response_iterator = multi_callable(REQUEST)
+ for response in response_iterator:
+ pass
+ elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
+ multi_callable = channel.stream_unary(method)
+ future = multi_callable.future(infinite_request_iterator())
+ result, call = multi_callable.with_call(
+ [REQUEST] * test_constants.STREAM_LENGTH)
+ elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
+ multi_callable = channel.stream_stream(method)
+ response_iterator = multi_callable(infinite_request_iterator())
+ for response in response_iterator:
+ pass
diff --git a/src/python/grpcio/tests/unit/_exit_test.py b/src/python/grpcio/tests/unit/_exit_test.py
new file mode 100644
index 0000000000..b0d6af73e5
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_exit_test.py
@@ -0,0 +1,185 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests clean exit of server/client on Python Interpreter exit/sigint.
+
+The tests in this module spawn a subprocess for each test case, the
+test is considered successful if it doesn't hang/timeout.
+"""
+
+import atexit
+import os
+import signal
+import six
+import subprocess
+import sys
+import threading
+import time
+import unittest
+
+from tests.unit import _exit_scenarios
+
+SCENARIO_FILE = os.path.abspath(os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), '_exit_scenarios.py'))
+INTERPRETER = sys.executable
+BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
+BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt']
+
+INIT_TIME = 1.0
+
+
+processes = []
+process_lock = threading.Lock()
+
+
+# Make sure we attempt to clean up any
+# processes we may have left running
+def cleanup_processes():
+ with process_lock:
+ for process in processes:
+ try:
+ process.kill()
+ except Exception:
+ pass
+atexit.register(cleanup_processes)
+
+
+def interrupt_and_wait(process):
+ with process_lock:
+ processes.append(process)
+ time.sleep(INIT_TIME)
+ os.kill(process.pid, signal.SIGINT)
+ process.wait()
+
+
+def wait(process):
+ with process_lock:
+ processes.append(process)
+ process.wait()
+
+
+class ExitTest(unittest.TestCase):
+
+ def test_unstarted_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.UNSTARTED_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_unstarted_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.UNSTARTED_SERVER],
+ stdout=sys.stdout)
+ interrupt_and_wait(process)
+
+ def test_running_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.RUNNING_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_running_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.RUNNING_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_poll_connectivity_no_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_poll_connectivity_no_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_poll_connectivity(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_poll_connectivity_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_unary_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_unary_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_stream_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_stream_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_partial_unary_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_partial_stream_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_partial_stream_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)