aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_server_windows.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_windows.c')
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c94
1 files changed, 50 insertions, 44 deletions
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ae54c70d2d..b0c8586bac 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ int outstanding_calls;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
/* linked list */
@@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
- }
+static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_tcp_server *s = arg;
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
+ GRPC_ERROR_NONE, NULL);
+}
+
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs);
return s;
@@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- sp->shutting_down = 1;
- grpc_winsocket_shutdown(sp->socket);
+ finish_shutdown_locked(exec_ctx, s);
+ } else {
+ for (sp = s->head; sp; sp = sp->next) {
+ sp->shutting_down = 1;
+ grpc_winsocket_shutdown(sp->socket);
+ }
}
gpr_mu_unlock(&s->mu);
-
- if (immediately_done) {
- finish_shutdown(exec_ctx, s);
- }
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@@ -251,31 +258,30 @@ failure:
return error;
}
-static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *sp) {
+static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
- gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
- notify = 1;
- }
- gpr_mu_unlock(&sp->server->mu);
- if (notify) {
- finish_shutdown(exec_ctx, sp->server);
+ finish_shutdown_locked(exec_ctx, sp->server);
}
}
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *port) {
+static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE;
+ if (port->shutting_down) {
+ return GRPC_ERROR_NONE;
+ }
+
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
@@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
+ port->outstanding_calls++;
return error;
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
- if (port->shutting_down) {
- /* We are abandoning the listener port, take that into account to prevent
- occasional hangs on shutdown. The hang happens when sp->shutting_down
- change is not seen by on_accept and we proceed to trying new accept,
- but we fail there because the listening port has been closed in the
- meantime. */
- decrement_active_ports_and_notify(exec_ctx, port);
- GRPC_ERROR_UNREF(error);
- return GRPC_ERROR_NONE;
- }
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
}
@@ -326,7 +323,6 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@@ -338,6 +334,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success;
int err;
+ gpr_mu_lock(&sp->server->mu);
+
peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While
@@ -347,6 +345,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg);
+ gpr_mu_unlock(&sp->server->mu);
return;
}
@@ -356,17 +355,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
- if (sp->shutting_down) {
- /* During the shutdown case, we ARE expecting an error. So that's well,
- and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(exec_ctx, sp);
- return;
- } else {
+ if (!sp->shutting_down) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
- closesocket(sock);
}
+ closesocket(sock);
} else {
if (!sp->shutting_down) {
peer_name_string = NULL;
@@ -401,14 +395,24 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) {
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
+ acceptor);
}
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
+ if (0 == --sp->outstanding_calls) {
+ decrement_active_ports_and_notify_locked(exec_ctx, sp);
+ }
+ gpr_mu_unlock(&sp->server->mu);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -456,6 +460,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
+ sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
@@ -553,7 +558,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);