diff options
author | Craig Tiller <ctiller@google.com> | 2016-06-27 14:01:25 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-06-27 14:01:25 -0700 |
commit | 8551a709a1fb62a2e748d230b52b530df55b24eb (patch) | |
tree | e5f221af5a7d644c934edd3fccbc487d08f31025 /src | |
parent | a6e022f98f157ff938fbfb90b5666eab83579cd2 (diff) | |
parent | a9df5d1029b303a305481c4bb765c6bed2e8ebbb (diff) |
Merge github.com:grpc/grpc into %s
Diffstat (limited to 'src')
19 files changed, 820 insertions, 56 deletions
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 9bae3a94f9..e5c987925c 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -97,7 +97,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { goto error; } - err = grpc_tcp_server_create(NULL, &tcp); + err = + grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index ead8a4d566..c42810e913 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -216,7 +216,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state = gpr_malloc(sizeof(*state)); memset(state, 0, sizeof(*state)); grpc_closure_init(&state->destroy_closure, destroy_done, state); - err = grpc_tcp_server_create(&state->destroy_closure, &tcp); + err = grpc_tcp_server_create(&state->destroy_closure, + grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c new file mode 100644 index 0000000000..38a1c9b7d4 --- /dev/null +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -0,0 +1,121 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "src/core/lib/iomgr/endpoint.h" + +typedef struct endpoint_ll_node { + grpc_endpoint *ep; + struct endpoint_ll_node *next; +} endpoint_ll_node; + +static endpoint_ll_node *head = NULL; +static gpr_mu g_endpoint_mutex; +static bool g_init_done = false; + +void grpc_initialize_network_status_monitor() { + g_init_done = true; + gpr_mu_init(&g_endpoint_mutex); + // TODO(makarandd): Install callback with OS to monitor network status. +} + +void grpc_destroy_network_status_monitor() { + for (endpoint_ll_node *curr = head; curr != NULL;) { + endpoint_ll_node *next = curr->next; + gpr_free(curr); + curr = next; + } + gpr_mu_destroy(&g_endpoint_mutex); +} + +void grpc_network_status_register_endpoint(grpc_endpoint *ep) { + if (!g_init_done) { + grpc_initialize_network_status_monitor(); + } + gpr_mu_lock(&g_endpoint_mutex); + if (head == NULL) { + head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); + head->ep = ep; + head->next = NULL; + } else { + endpoint_ll_node *prev_head = head; + head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); + head->ep = ep; + head->next = prev_head; + } + gpr_mu_unlock(&g_endpoint_mutex); +} + +void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) { + gpr_mu_lock(&g_endpoint_mutex); + GPR_ASSERT(head); + bool found = false; + endpoint_ll_node *prev = head; + // if we're unregistering the head, just move head to the next + if (ep == head->ep) { + head = head->next; + gpr_free(prev); + found = true; + } else { + for (endpoint_ll_node *curr = head->next; curr != NULL; curr = curr->next) { + if (ep == curr->ep) { + prev->next = curr->next; + gpr_free(curr); + found = true; + break; + } + prev = curr; + } + } + gpr_mu_unlock(&g_endpoint_mutex); + GPR_ASSERT(found); +} + +// Walk the linked-list from head and execute shutdown. It is possible that +// other threads might be in the process of shutdown as well, but that has +// no side effect since endpoint shutdown is idempotent. +void grpc_network_status_shutdown_all_endpoints() { + gpr_mu_lock(&g_endpoint_mutex); + if (head == NULL) { + gpr_mu_unlock(&g_endpoint_mutex); + return; + } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) { + curr->ep->vtable->shutdown(&exec_ctx, curr->ep); + } + gpr_mu_unlock(&g_endpoint_mutex); + grpc_exec_ctx_finish(&exec_ctx); +} diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h new file mode 100644 index 0000000000..74a1aa8135 --- /dev/null +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H +#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H +#include "src/core/lib/iomgr/endpoint.h" + +void grpc_network_status_register_endpoint(grpc_endpoint *ep); +void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); +void grpc_network_status_shutdown_all_endpoints(); +#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 3a1371617e..d2f6261e2a 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -169,6 +169,28 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse) { return GRPC_ERROR_NONE; } +/* set a socket to reuse old addresses */ +grpc_error *grpc_set_socket_reuse_port(int fd, int reuse) { +#ifndef SO_REUSEPORT + return GRPC_ERROR_CREATE("SO_REUSEPORT unavailable on compiling system"); +#else + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + if (0 != setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) { + return GRPC_OS_ERROR(errno, "setsockopt(SO_REUSEPORT)"); + } + if (0 != getsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) { + return GRPC_OS_ERROR(errno, "getsockopt(SO_REUSEPORT)"); + } + if ((newval != 0) != val) { + return GRPC_ERROR_CREATE("Failed to set SO_REUSEPORT"); + } + + return GRPC_ERROR_NONE; +#endif +} + /* disable nagle */ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { int val = (low_latency != 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 30ff39dfa3..7bcc2219ae 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -55,6 +55,9 @@ grpc_error *grpc_set_socket_reuse_addr(int fd, int reuse); /* disable nagle */ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency); +/* set SO_REUSEPORT */ +grpc_error *grpc_set_socket_reuse_port(int fd, int reuse); + /* Returns true if this system can create AF_INET6 sockets bound to ::1. The value is probed once, and cached for the life of the process. diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 1046b60bcc..2ab45e33ce 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -35,6 +35,7 @@ #ifdef GPR_POSIX_SOCKET +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/tcp_posix.h" #include <errno.h> @@ -152,6 +153,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -474,6 +476,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); + /* Tell network status tracker about new endpoint */ + grpc_network_status_register_endpoint(&tcp->base); return &tcp->base; } diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 1d6e70dbe9..875a6ca14a 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -34,6 +34,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_H #define GRPC_CORE_LIB_IOMGR_TCP_SERVER_H +#include <grpc/grpc.h> + #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" @@ -59,6 +61,7 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, If shutdown_complete is not NULL, it will be used by grpc_tcp_server_unref() when the ref count reaches zero. */ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + const grpc_channel_args *args, grpc_tcp_server **server); /* Start listening to bound ports */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 6fb547bb36..a1a463550a 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -112,8 +112,10 @@ struct grpc_tcp_server { /* destroyed port count: how many ports are completely destroyed */ size_t destroyed_ports; - /* is this server shutting down? (boolean) */ - int shutdown; + /* is this server shutting down? */ + bool shutdown; + /* use SO_REUSEPORT */ + bool so_reuseport; /* linked list of server ports */ grpc_tcp_listener *head; @@ -135,14 +137,42 @@ struct grpc_tcp_server { size_t next_pollset_to_assign; }; +static gpr_once check_init = GPR_ONCE_INIT; +static bool has_so_reuseport; + +static void init(void) { + int s = socket(AF_INET, SOCK_STREAM, 0); + if (s >= 0) { + has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT", + grpc_set_socket_reuse_port(s, 1)); + close(s); + } +} + grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + const grpc_channel_args *args, grpc_tcp_server **server) { + gpr_once_init(&check_init, init); + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + s->so_reuseport = has_so_reuseport; + for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { + if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_INTEGER) { + s->so_reuseport = + has_so_reuseport && (args->args[i].value.integer != 0); + } else { + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT + " must be an integer"); + } + } + } gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; s->destroyed_ports = 0; - s->shutdown = 0; + s->shutdown = false; s->shutdown_starting.head = NULL; s->shutdown_starting.tail = NULL; s->shutdown_complete = shutdown_complete; @@ -218,7 +248,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); - s->shutdown = 1; + s->shutdown = true; /* shutdown all fd's */ if (s->active_ports) { @@ -268,13 +298,19 @@ static int get_max_accept_queue_size(void) { /* Prepare a recently-created socket for listening. */ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, - size_t addr_len, int *port) { + size_t addr_len, bool so_reuseport, + int *port) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; grpc_error *err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); + if (so_reuseport) { + err = grpc_set_socket_reuse_port(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + err = grpc_set_socket_nonblocking(fd, 1); if (err != GRPC_ERROR_NONE) goto error; err = grpc_set_socket_cloexec(fd, 1); @@ -407,7 +443,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, char *addr_str; char *name; - grpc_error *err = prepare_socket(fd, addr, addr_len, &port); + grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port); if (err == GRPC_ERROR_NONE) { GPR_ASSERT(port > 0); grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); @@ -443,6 +479,52 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, return err; } +static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { + grpc_tcp_listener *sp = NULL; + char *addr_str; + char *name; + grpc_error *err; + + for (grpc_tcp_listener *l = listener->next; l && l->is_sibling; l = l->next) { + l->fd_index += count; + } + + for (unsigned i = 0; i < count; i++) { + int fd, port; + grpc_dualstack_mode dsmode; + err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, + &dsmode, &fd); + if (err != GRPC_ERROR_NONE) return err; + err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true, + &port); + if (err != GRPC_ERROR_NONE) return err; + listener->server->nports++; + grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp->next = listener->next; + listener->next = sp; + sp->server = listener->server; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len); + sp->addr_len = listener->addr_len; + sp->port = port; + sp->port_index = listener->port_index; + sp->fd_index = listener->fd_index + count - i; + sp->is_sibling = 1; + sp->sibling = listener->is_sibling ? listener->sibling : listener; + GPR_ASSERT(sp->emfd); + while (listener->server->tail->next != NULL) { + listener->server->tail = listener->server->tail->next; + } + gpr_free(addr_str); + gpr_free(name); + } + + return GRPC_ERROR_NONE; +} + grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, size_t addr_len, int *out_port) { grpc_tcp_listener *sp; @@ -599,14 +681,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->on_accept_cb_arg = on_accept_cb_arg; s->pollsets = pollsets; s->pollset_count = pollset_count; - for (sp = s->head; sp; sp = sp->next) { - for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + sp = s->head; + while (sp != NULL) { + if (s->so_reuseport && pollset_count > 1) { + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + s->active_ports++; + sp = sp->next; + } + } else { + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); + } + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + s->active_ports++; + sp = sp->next; } - sp->read_closure.cb = on_read; - sp->read_closure.cb_arg = sp; - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); - s->active_ports++; } gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 86982bc183..7b0966704c 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -103,6 +103,7 @@ struct grpc_tcp_server { /* Public function. Allocates the proper data structures to hold a grpc_tcp_server. */ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_ref_init(&s->refs, 1); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index b2af8030aa..37ab59021e 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -37,6 +37,7 @@ #include <limits.h> +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/sockaddr_windows.h" #include <grpc/support/alloc.h> @@ -378,6 +379,7 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { } static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; TCP_UNREF(tcp, "destroy"); } @@ -401,6 +403,9 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); + /* Tell network status tracking code about the new endpoint */ + grpc_network_status_register_endpoint(&tcp->base); + return &tcp->base; } diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 16150687d3..1ebccf2ee2 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -243,13 +243,13 @@ static int prepare_socket(int fd, const struct sockaddr *addr, if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) { gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", - buf_size_bytes); + buffer_size_bytes); goto error; } if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) { gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", - buf_size_bytes); + buffer_size_bytes); goto error; } diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index ce988f9dfa..745b5023d5 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -265,8 +265,8 @@ void InitLogConstants(Local<Object> exports) { Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG); Local<Value> INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO)); Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), INFO); - Local<Value> ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR)); - Nan::Set(log_verbosity, Nan::New("ERROR").ToLocalChecked(), ERROR); + Local<Value> LOG_ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR)); + Nan::Set(log_verbosity, Nan::New("ERROR").ToLocalChecked(), LOG_ERROR); } NAN_METHOD(MetadataKeyIsLegal) { diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 1f34beeb2c..cf6175d031 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -179,6 +179,7 @@ def _event_handler(state, call, response_deserializer): def _consume_request_iterator( request_iterator, state, call, request_serializer): event_handler = _event_handler(state, call, None) + def consume_request_iterator(): for request in request_iterator: serialized_request = _common.serialize(request, request_serializer) @@ -212,8 +213,18 @@ def _consume_request_iterator( ) call.start_batch(cygrpc.Operations(operations), event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) - thread = threading.Thread(target=consume_request_iterator) - thread.start() + + def stop_consumption_thread(timeout): + with state.condition: + if state.code is None: + call.cancel() + state.cancelled = True + _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!') + state.condition.notify_all() + + consumption_thread = _common.CleanupThread( + stop_consumption_thread, target=consume_request_iterator) + consumption_thread.start() class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): @@ -652,16 +663,27 @@ class _ChannelCallState(object): self.managed_calls = None -def _call_spin(state): - while True: - event = state.completion_queue.poll() - completed_call = event.tag(event) - if completed_call is not None: - with state.lock: - state.managed_calls.remove(completed_call) - if not state.managed_calls: - state.managed_calls = None - return +def _run_channel_spin_thread(state): + def channel_spin(): + while True: + event = state.completion_queue.poll() + completed_call = event.tag(event) + if completed_call is not None: + with state.lock: + state.managed_calls.remove(completed_call) + if not state.managed_calls: + state.managed_calls = None + return + + def stop_channel_spin(timeout): + with state.lock: + if state.managed_calls is not None: + for call in state.managed_calls: + call.cancel() + + channel_spin_thread = _common.CleanupThread( + stop_channel_spin, target=channel_spin) + channel_spin_thread.start() def _create_channel_managed_call(state): @@ -690,8 +712,7 @@ def _create_channel_managed_call(state): parent, flags, state.completion_queue, method, host, deadline) if state.managed_calls is None: state.managed_calls = set((call,)) - spin_thread = threading.Thread(target=_call_spin, args=(state,)) - spin_thread.start() + _run_channel_spin_thread(state) else: state.managed_calls.add(call) return call @@ -784,11 +805,18 @@ def _poll_connectivity(state, channel, initial_try_to_connect): _spawn_delivery(state, callbacks) +def _moot(state): + with state.lock: + del state.callbacks_and_connectivities[:] + + def _subscribe(state, callback, try_to_connect): with state.lock: if not state.callbacks_and_connectivities and not state.polling: - polling_thread = threading.Thread( - target=_poll_connectivity, + def cancel_all_subscriptions(timeout): + _moot(state) + polling_thread = _common.CleanupThread( + cancel_all_subscriptions, target=_poll_connectivity, args=(state, state.channel, bool(try_to_connect))) polling_thread.start() state.polling = True @@ -812,11 +840,6 @@ def _unsubscribe(state, callback): break -def _moot(state): - with state.lock: - del state.callbacks_and_connectivities[:] - - def _options(options): if options is None: pairs = ((cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT),) diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 79e6ca87eb..c695434dac 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -161,14 +161,24 @@ class _Callback(stream.Consumer): self._condition.wait() -def _pipe_requests(request_iterator, request_consumer, servicer_context): - for request in request_iterator: - if not servicer_context.is_active(): - return - request_consumer.consume(request) - if not servicer_context.is_active(): - return - request_consumer.terminate() +def _run_request_pipe_thread(request_iterator, request_consumer, + servicer_context): + thread_joined = threading.Event() + def pipe_requests(): + for request in request_iterator: + if not servicer_context.is_active() or thread_joined.is_set(): + return + request_consumer.consume(request) + if not servicer_context.is_active() or thread_joined.is_set(): + return + request_consumer.terminate() + + def stop_request_pipe(timeout): + thread_joined.set() + + request_pipe_thread = _common.CleanupThread( + stop_request_pipe, target=pipe_requests) + request_pipe_thread.start() def _adapt_unary_unary_event(unary_unary_event): @@ -206,10 +216,8 @@ def _adapt_stream_unary_event(stream_unary_event): raise abandonment.Abandoned() request_consumer = stream_unary_event( callback.consume_and_terminate, _FaceServicerContext(servicer_context)) - request_pipe_thread = threading.Thread( - target=_pipe_requests, - args=(request_iterator, request_consumer, servicer_context,)) - request_pipe_thread.start() + _run_request_pipe_thread( + request_iterator, request_consumer, servicer_context) return callback.draw_all_values()[0] return adaptation @@ -221,10 +229,8 @@ def _adapt_stream_stream_event(stream_stream_event): raise abandonment.Abandoned() request_consumer = stream_stream_event( callback, _FaceServicerContext(servicer_context)) - request_pipe_thread = threading.Thread( - target=_pipe_requests, - args=(request_iterator, request_consumer, servicer_context,)) - request_pipe_thread.start() + _run_request_pipe_thread( + request_iterator, request_consumer, servicer_context) while True: response = callback.draw_one_value() if response is None: diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index ea4d17351c..b37e27c27e 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -105,6 +105,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/iomgr_posix.c', 'src/core/lib/iomgr/iomgr_windows.c', 'src/core/lib/iomgr/load_file.c', + 'src/core/lib/iomgr/network_status_tracker.c', 'src/core/lib/iomgr/polling_entity.c', 'src/core/lib/iomgr/pollset_set_windows.c', 'src/core/lib/iomgr/pollset_windows.c', diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 128f85b8b0..9c118ef601 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -14,6 +14,7 @@ "_connectivity_channel_test.ChannelConnectivityTest", "_connectivity_channel_test.ConnectivityStatesTest", "_empty_message_test.EmptyMessageTest", + "_exit_test.ExitTest", "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", diff --git a/src/python/grpcio/tests/unit/_exit_scenarios.py b/src/python/grpcio/tests/unit/_exit_scenarios.py new file mode 100644 index 0000000000..24a2faef85 --- /dev/null +++ b/src/python/grpcio/tests/unit/_exit_scenarios.py @@ -0,0 +1,249 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines a number of module-scope gRPC scenarios to test clean exit.""" + +import argparse +import threading +import time + +import grpc + +from tests.unit.framework.common import test_constants + +WAIT_TIME = 1000 + +REQUEST = b'request' + +UNSTARTED_SERVER = 'unstarted_server' +RUNNING_SERVER = 'running_server' +POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server' +POLL_CONNECTIVITY = 'poll_connectivity' +IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call' +IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call' +IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call' +IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call' +IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call' +IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call' +IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call' + +UNARY_UNARY = b'/test/UnaryUnary' +UNARY_STREAM = b'/test/UnaryStream' +STREAM_UNARY = b'/test/StreamUnary' +STREAM_STREAM = b'/test/StreamStream' +PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream' +PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary' +PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream' + +TEST_TO_METHOD = { + IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY, + IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM, + IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY, + IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM, + IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM, + IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY, + IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM, +} + + +def hang_unary_unary(request, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_unary_stream(request, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_unary_stream(request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + yield request + time.sleep(WAIT_TIME) + + +def hang_stream_unary(request_iterator, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_stream_unary(request_iterator, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + next(request_iterator) + time.sleep(WAIT_TIME) + + +def hang_stream_stream(request_iterator, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_stream_stream(request_iterator, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + yield next(request_iterator) + time.sleep(WAIT_TIME) + + +class MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming, response_streaming, partial_hang): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + if partial_hang: + self.stream_stream = hang_partial_stream_stream + else: + self.stream_stream = hang_stream_stream + elif self.request_streaming: + if partial_hang: + self.stream_unary = hang_partial_stream_unary + else: + self.stream_unary = hang_stream_unary + elif self.response_streaming: + if partial_hang: + self.unary_stream = hang_partial_unary_stream + else: + self.unary_stream = hang_unary_stream + else: + self.unary_unary = hang_unary_unary + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == UNARY_UNARY: + return MethodHandler(False, False, False) + elif handler_call_details.method == UNARY_STREAM: + return MethodHandler(False, True, False) + elif handler_call_details.method == STREAM_UNARY: + return MethodHandler(True, False, False) + elif handler_call_details.method == STREAM_STREAM: + return MethodHandler(True, True, False) + elif handler_call_details.method == PARTIAL_UNARY_STREAM: + return MethodHandler(False, True, True) + elif handler_call_details.method == PARTIAL_STREAM_UNARY: + return MethodHandler(True, False, True) + elif handler_call_details.method == PARTIAL_STREAM_STREAM: + return MethodHandler(True, True, True) + else: + return None + + +# Traditional executors will not exit until all their +# current jobs complete. Because we submit jobs that will +# never finish, we don't want to block exit on these jobs. +class DaemonPool(object): + + def submit(self, fn, *args, **kwargs): + thread = threading.Thread(target=fn, args=args, kwargs=kwargs) + thread.daemon = True + thread.start() + + def shutdown(self, wait=True): + pass + + +def infinite_request_iterator(): + while True: + yield REQUEST + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + parser.add_argument( + '--wait_for_interrupt', dest='wait_for_interrupt', action='store_true') + args = parser.parse_args() + + if args.scenario == UNSTARTED_SERVER: + server = grpc.server((), DaemonPool()) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == RUNNING_SERVER: + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.start() + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == POLL_CONNECTIVITY_NO_SERVER: + channel = grpc.insecure_channel('localhost:12345') + + def connectivity_callback(connectivity): + pass + + channel.subscribe(connectivity_callback, try_to_connect=True) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == POLL_CONNECTIVITY: + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:%d' % port) + + def connectivity_callback(connectivity): + pass + + channel.subscribe(connectivity_callback, try_to_connect=True) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + + else: + handler = GenericHandler() + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((handler,)) + server.start() + channel = grpc.insecure_channel('localhost:%d' % port) + + method = TEST_TO_METHOD[args.scenario] + + if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL: + multi_callable = channel.unary_unary(method) + future = multi_callable.future(REQUEST) + result, call = multi_callable.with_call(REQUEST) + elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or + args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL): + multi_callable = channel.unary_stream(method) + response_iterator = multi_callable(REQUEST) + for response in response_iterator: + pass + elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or + args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL): + multi_callable = channel.stream_unary(method) + future = multi_callable.future(infinite_request_iterator()) + result, call = multi_callable.with_call( + [REQUEST] * test_constants.STREAM_LENGTH) + elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or + args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): + multi_callable = channel.stream_stream(method) + response_iterator = multi_callable(infinite_request_iterator()) + for response in response_iterator: + pass diff --git a/src/python/grpcio/tests/unit/_exit_test.py b/src/python/grpcio/tests/unit/_exit_test.py new file mode 100644 index 0000000000..b0d6af73e5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_exit_test.py @@ -0,0 +1,185 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests clean exit of server/client on Python Interpreter exit/sigint. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import signal +import six +import subprocess +import sys +import threading +import time +import unittest + +from tests.unit import _exit_scenarios + +SCENARIO_FILE = os.path.abspath(os.path.join( + os.path.dirname(os.path.realpath(__file__)), '_exit_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] +BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt'] + +INIT_TIME = 1.0 + + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: + pass +atexit.register(cleanup_processes) + + +def interrupt_and_wait(process): + with process_lock: + processes.append(process) + time.sleep(INIT_TIME) + os.kill(process.pid, signal.SIGINT) + process.wait() + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ExitTest(unittest.TestCase): + + def test_unstarted_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.UNSTARTED_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_unstarted_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.UNSTARTED_SERVER], + stdout=sys.stdout) + interrupt_and_wait(process) + + def test_running_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.RUNNING_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_running_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.RUNNING_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_poll_connectivity_no_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_poll_connectivity_no_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_poll_connectivity(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_poll_connectivity_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_unary_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_unary_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_stream_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_stream_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_partial_unary_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_partial_stream_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_partial_stream_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |