aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:00:03 -0800
committerGravatar GitHub <noreply@github.com>2016-11-30 13:00:03 -0800
commit6facd26db32f22a7928c1dd90671abc1dab2deac (patch)
treec2d37d76d55e6c2131fba919de82e5be316915f6 /src
parent2a5af02a184d11bf97f340991b824ea5203bd055 (diff)
parente06836130015bccc7d1ae8afd1fad403318503b0 (diff)
Merge pull request #8903 from ctiller/tcp_shutdown
Fix TCP shutdown path on Windows
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/socket_windows.c8
-rw-r--r--src/core/lib/iomgr/socket_windows.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c26
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c86
4 files changed, 68 insertions, 53 deletions
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 35f23300dc..54911e0e31 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
LPFN_DISCONNECTEX DisconnectEx;
DWORD ioctl_num_bytes;
+ gpr_mu_lock(&winsocket->state_mu);
+ if (winsocket->shutdown_called) {
+ gpr_mu_unlock(&winsocket->state_mu);
+ return;
+ }
+ winsocket->shutdown_called = true;
+ gpr_mu_unlock(&winsocket->state_mu);
+
status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
&ioctl_num_bytes, NULL, NULL);
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 490d0e0a06..a3875ce16c 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -87,6 +87,7 @@ typedef struct grpc_winsocket {
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
+ bool shutdown_called;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 4d1e809872..1127588ebc 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE && socket != NULL) {
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success =
- WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
- &transfered_bytes, FALSE, &flags);
- GPR_ASSERT(transfered_bytes == 0);
- if (!wsa_success) {
- error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ if (error == GRPC_ERROR_NONE) {
+ if (socket != NULL) {
+ DWORD transfered_bytes = 0;
+ DWORD flags;
+ BOOL wsa_success =
+ WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
+ &transfered_bytes, FALSE, &flags);
+ GPR_ASSERT(transfered_bytes == 0);
+ if (!wsa_success) {
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ } else {
+ *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
+ socket = NULL;
+ }
} else {
- *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
- socket = NULL;
+ error = GRPC_ERROR_CREATE("socket is null");
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ae54c70d2d..b8a391c059 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ int outstanding_calls;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
/* linked list */
@@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
- }
+static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_tcp_server *s = arg;
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
+ GRPC_ERROR_NONE, NULL);
+}
+
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs);
return s;
@@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- sp->shutting_down = 1;
- grpc_winsocket_shutdown(sp->socket);
+ finish_shutdown_locked(exec_ctx, s);
+ } else {
+ for (sp = s->head; sp; sp = sp->next) {
+ sp->shutting_down = 1;
+ grpc_winsocket_shutdown(sp->socket);
+ }
}
gpr_mu_unlock(&s->mu);
-
- if (immediately_done) {
- finish_shutdown(exec_ctx, s);
- }
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@@ -251,31 +258,30 @@ failure:
return error;
}
-static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *sp) {
+static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
- gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
- notify = 1;
- }
- gpr_mu_unlock(&sp->server->mu);
- if (notify) {
- finish_shutdown(exec_ctx, sp->server);
+ finish_shutdown_locked(exec_ctx, sp->server);
}
}
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *port) {
+static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE;
+ if (port->shutting_down) {
+ return GRPC_ERROR_NONE;
+ }
+
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
@@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
+ port->outstanding_calls++;
return error;
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
- if (port->shutting_down) {
- /* We are abandoning the listener port, take that into account to prevent
- occasional hangs on shutdown. The hang happens when sp->shutting_down
- change is not seen by on_accept and we proceed to trying new accept,
- but we fail there because the listening port has been closed in the
- meantime. */
- decrement_active_ports_and_notify(exec_ctx, port);
- GRPC_ERROR_UNREF(error);
- return GRPC_ERROR_NONE;
- }
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
}
@@ -338,6 +335,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success;
int err;
+ gpr_mu_lock(&sp->server->mu);
+
peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While
@@ -347,6 +346,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg);
+ gpr_mu_unlock(&sp->server->mu);
return;
}
@@ -356,17 +356,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
- if (sp->shutting_down) {
- /* During the shutdown case, we ARE expecting an error. So that's well,
- and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(exec_ctx, sp);
- return;
- } else {
+ if (!sp->shutting_down) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
- closesocket(sock);
}
+ closesocket(sock);
} else {
if (!sp->shutting_down) {
peer_name_string = NULL;
@@ -408,7 +403,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
+ if (0 == --sp->outstanding_calls) {
+ decrement_active_ports_and_notify_locked(exec_ctx, sp);
+ }
+ gpr_mu_unlock(&sp->server->mu);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -456,6 +456,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
+ sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
@@ -553,7 +554,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);