aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_server_windows.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/tcp_server_windows.c')
-rw-r--r--src/core/iomgr/tcp_server_windows.c57
1 files changed, 47 insertions, 10 deletions
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index a09c1ae335..c4d3293e83 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -55,10 +55,15 @@
/* one listening port */
typedef struct server_port {
- gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32];
+ /* This seemingly magic number comes from AcceptEx's documentation. each
+ address buffer needs to have at least 16 more bytes at their end. */
+ gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
+ /* This will hold the socket for the next accept. */
SOCKET new_socket;
+ /* The listener winsocked. */
grpc_winsocket *socket;
grpc_tcp_server *server;
+ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
} server_port;
@@ -80,6 +85,8 @@ struct grpc_tcp_server {
size_t port_capacity;
};
+/* Public function. Allocates the proper data structures to hold a
+ grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu);
@@ -93,22 +100,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s;
}
+/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_tcp_server *s,
void (*shutdown_done)(void *shutdown_done_arg),
void *shutdown_done_arg) {
size_t i;
gpr_mu_lock(&s->mu);
- /* shutdown all fd's */
+ /* First, shutdown all fd's. This will queue abortion calls for all
+ of the pending accepts. */
for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket);
}
- /* wait while that happens */
+ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
gpr_mu_unlock(&s->mu);
- /* delete ALL the things */
+ /* Now that the accepts have been aborted, we can destroy the sockets.
+ The IOCP won't get notified on these, so we can flag them as already
+ closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
sp->socket->closed_early = 1;
@@ -122,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
}
}
-/* Prepare a recently-created socket for listening. */
+/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
int addr_len) {
struct sockaddr_storage sockname_temp;
@@ -170,8 +181,11 @@ error:
return -1;
}
-static void on_accept(void *arg, int success);
+/* start_accept will reference that for the IOCP notification request. */
+static void on_accept(void *arg, int from_iocp);
+/* In order to do an async accept, we need to create a socket first which
+ will be the one assigned to the new incoming connection. */
static void start_accept(server_port *port) {
SOCKET sock = INVALID_SOCKET;
char *message;
@@ -193,10 +207,13 @@ static void start_accept(server_port *port) {
goto failure;
}
+ /* Start the "accept" asynchronously. */
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped);
+ /* It is possible to get an accept immediately without delay. However, we
+ will still get an IOCP notification for it. So let's just ignore it. */
if (!success) {
int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) {
@@ -205,6 +222,8 @@ static void start_accept(server_port *port) {
}
}
+ /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
+ immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(port->socket, on_accept, port);
return;
@@ -216,14 +235,18 @@ failure:
if (sock != INVALID_SOCKET) closesocket(sock);
}
-/* event manager callback when reads are ready */
-static void on_accept(void *arg, int success) {
+/* Event manager callback when reads are ready. */
+static void on_accept(void *arg, int from_iocp) {
server_port *sp = arg;
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
+ /* The shutdown sequence is done in two parts. This is the second
+ part here, acknowledging the IOCP notification, and doing nothing
+ else, especially not queuing a new accept. */
if (sp->shutting_down) {
+ GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
@@ -233,7 +256,9 @@ static void on_accept(void *arg, int success) {
return;
}
- if (success) {
+ if (from_iocp) {
+ /* The IOCP notified us of a completed operation. Let's grab the results,
+ and act accordingly. */
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -247,13 +272,23 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock));
}
} else {
+ /* If we're not notified from the IOCP, it means we are asked to shutdown.
+ This will initiate that shutdown. Calling closesocket will trigger an
+ IOCP notification, that will call this function a second time, from
+ the IOCP thread. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
closesocket(sock);
}
+ /* The only time we should call our callback, is where we successfully
+ managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
- if (success) {
+ if (from_iocp) {
+ /* As we were notified from the IOCP of one and exactly one accept,
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
start_accept(sp);
}
}
@@ -269,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (sock == INVALID_SOCKET) return -1;
+ /* We need to grab the AcceptEx pointer for that port, as it may be
+ interface-dependent. We'll cache it to avoid doing that again. */
status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);