diff options
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 50 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 57 |
4 files changed, 89 insertions, 23 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 7968729353..e7fc744ceb 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -172,7 +172,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { } void grpc_iocp_socket_orphan(grpc_winsocket *socket) { + GPR_ASSERT(!socket->orphan); gpr_atm_full_fetch_add(&g_orphans, 1); + socket->orphan = 1; } static void socket_notify_on_iocp(grpc_winsocket *socket, diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index a622da93fb..fe0196d99c 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -69,13 +69,12 @@ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { SOCKET socket = winsocket->socket; if (!winsocket->closed_early) { grpc_iocp_socket_orphan(winsocket); - winsocket->orphan = 1; } - grpc_iomgr_unref(); if (winsocket->closed_early) { grpc_winsocket_destroy(winsocket); } closesocket(socket); + grpc_iomgr_unref(); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index eee6320e78..653c0c65c5 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -59,6 +59,7 @@ typedef struct { gpr_timespec deadline; grpc_alarm alarm; int refs; + int aborted; } async_connect; static void async_connect_cleanup(async_connect *ac) { @@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) { } } -static void on_alarm(void *acp, int success) { +static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->socket != NULL && success) { + /* If the alarm didn't occor, it got cancelled. */ + if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } async_connect_cleanup(ac); } -static void on_connect(void *acp, int success) { +static void on_connect(void *acp, int from_iocp) { async_connect *ac = acp; SOCKET sock = ac->socket->socket; grpc_endpoint *ep = NULL; grpc_winsocket_callback_info *info = &ac->socket->write_info; void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + int aborted; grpc_alarm_cancel(&ac->alarm); - if (success) { + gpr_mu_lock(&ac->mu); + aborted = ac->aborted; + + if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, @@ -107,23 +113,40 @@ static void on_connect(void *acp, int success) { } } else { gpr_log(GPR_ERROR, "on_connect is shutting down"); - goto finish; + /* If the connection timeouts, we will still get a notification from + the IOCP whatever happens. So we're just going to flag that connection + as being in the process of being aborted, and wait for the IOCP. We + can't just orphan the socket now, because the IOCP might already have + gotten a successful connection, which is our worst-case scenario. + We need to call our callback now to respect the deadline. */ + ac->aborted = 1; + gpr_mu_unlock(&ac->mu); + cb(cb_arg, NULL); + return; } abort(); finish: - gpr_mu_lock(&ac->mu); - if (!ep) { - if (success) { - ac->socket->closed_early = 1; - } + /* If we don't have an endpoint, it means the connection failed, + so it doesn't matter if it aborted or failed. We need to orphan + that socket. */ + if (!ep || aborted) { + /* If the connection failed, it means we won't get an IOCP notification, + so let's flag it as already closed. But if the connection was aborted, + while we still got an endpoint, we have to wait for the IOCP to collect + that socket. So let's properly flag that. */ + ac->socket->closed_early = !ep; grpc_winsocket_orphan(ac->socket); } async_connect_cleanup(ac); - cb(cb_arg, ep); + /* If the connection was aborted, the callback was already called when + the deadline was met. */ + if (!aborted) cb(cb_arg, ep); } +/* Tries to issue one async connection, then schedules both an IOCP + notification request for the connection, and one timeout alert. */ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), void *arg, const struct sockaddr *addr, int addr_len, gpr_timespec deadline) { @@ -159,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), goto failure; } + /* Grab the function pointer for ConnectEx for that specific socket. + It may change depending on the interface. */ status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL); @@ -181,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), info = &socket->write_info; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + /* It wouldn't be unusual to get a success immediately. But we'll still get + an IOCP notification, so let's ignore it. */ if (!success) { int error = WSAGetLastError(); if (error != ERROR_IO_PENDING) { @@ -195,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), ac->socket = socket; gpr_mu_init(&ac->mu); ac->refs = 2; + ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_socket_notify_on_write(socket, on_connect, ac); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index a09c1ae335..c4d3293e83 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -55,10 +55,15 @@ /* one listening port */ typedef struct server_port { - gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; + /* This seemingly magic number comes from AcceptEx's documentation. each + address buffer needs to have at least 16 more bytes at their end. */ + gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; + /* This will hold the socket for the next accept. */ SOCKET new_socket; + /* The listener winsocked. */ grpc_winsocket *socket; grpc_tcp_server *server; + /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; } server_port; @@ -80,6 +85,8 @@ struct grpc_tcp_server { size_t port_capacity; }; +/* Public function. Allocates the proper data structures to hold a + grpc_tcp_server. */ grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); @@ -93,22 +100,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } +/* Public function. Stops and destroys a grpc_tcp_server. */ void grpc_tcp_server_destroy(grpc_tcp_server *s, void (*shutdown_done)(void *shutdown_done_arg), void *shutdown_done_arg) { size_t i; gpr_mu_lock(&s->mu); - /* shutdown all fd's */ + /* First, shutdown all fd's. This will queue abortion calls for all + of the pending accepts. */ for (i = 0; i < s->nports; i++) { grpc_winsocket_shutdown(s->ports[i].socket); } - /* wait while that happens */ + /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); } gpr_mu_unlock(&s->mu); - /* delete ALL the things */ + /* Now that the accepts have been aborted, we can destroy the sockets. + The IOCP won't get notified on these, so we can flag them as already + closed by the system. */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; sp->socket->closed_early = 1; @@ -122,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, } } -/* Prepare a recently-created socket for listening. */ +/* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, int addr_len) { struct sockaddr_storage sockname_temp; @@ -170,8 +181,11 @@ error: return -1; } -static void on_accept(void *arg, int success); +/* start_accept will reference that for the IOCP notification request. */ +static void on_accept(void *arg, int from_iocp); +/* In order to do an async accept, we need to create a socket first which + will be the one assigned to the new incoming connection. */ static void start_accept(server_port *port) { SOCKET sock = INVALID_SOCKET; char *message; @@ -193,10 +207,13 @@ static void start_accept(server_port *port) { goto failure; } + /* Start the "accept" asynchronously. */ success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, addrlen, addrlen, &bytes_received, &port->socket->read_info.overlapped); + /* It is possible to get an accept immediately without delay. However, we + will still get an IOCP notification for it. So let's just ignore it. */ if (!success) { int error = WSAGetLastError(); if (error != ERROR_IO_PENDING) { @@ -205,6 +222,8 @@ static void start_accept(server_port *port) { } } + /* We're ready to do the accept. Calling grpc_socket_notify_on_read may + immediately process an accept that happened in the meantime. */ port->new_socket = sock; grpc_socket_notify_on_read(port->socket, on_accept, port); return; @@ -216,14 +235,18 @@ failure: if (sock != INVALID_SOCKET) closesocket(sock); } -/* event manager callback when reads are ready */ -static void on_accept(void *arg, int success) { +/* Event manager callback when reads are ready. */ +static void on_accept(void *arg, int from_iocp) { server_port *sp = arg; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; + /* The shutdown sequence is done in two parts. This is the second + part here, acknowledging the IOCP notification, and doing nothing + else, especially not queuing a new accept. */ if (sp->shutting_down) { + GPR_ASSERT(from_iocp); sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { @@ -233,7 +256,9 @@ static void on_accept(void *arg, int success) { return; } - if (success) { + if (from_iocp) { + /* The IOCP notified us of a completed operation. Let's grab the results, + and act accordingly. */ DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, @@ -247,13 +272,23 @@ static void on_accept(void *arg, int success) { ep = grpc_tcp_create(grpc_winsocket_create(sock)); } } else { + /* If we're not notified from the IOCP, it means we are asked to shutdown. + This will initiate that shutdown. Calling closesocket will trigger an + IOCP notification, that will call this function a second time, from + the IOCP thread. */ sp->shutting_down = 1; sp->new_socket = INVALID_SOCKET; closesocket(sock); } + /* The only time we should call our callback, is where we successfully + managed to accept a connection, and created an endpoint. */ if (ep) sp->server->cb(sp->server->cb_arg, ep); - if (success) { + if (from_iocp) { + /* As we were notified from the IOCP of one and exactly one accept, + the former socked we created has now either been destroy or assigned + to the new connection. We need to create a new one for the next + connection. */ start_accept(sp); } } @@ -269,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, if (sock == INVALID_SOCKET) return -1; + /* We need to grab the AcceptEx pointer for that port, as it may be + interface-dependent. We'll cache it to avoid doing that again. */ status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); |