aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/iocp_windows.c2
-rw-r--r--src/core/iomgr/socket_windows.c24
-rw-r--r--src/core/iomgr/socket_windows.h1
-rw-r--r--src/core/iomgr/tcp_client_windows.c48
-rw-r--r--src/core/iomgr/tcp_server_windows.c79
-rw-r--r--src/core/iomgr/tcp_windows.c2
6 files changed, 122 insertions, 34 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 7968729353..e7fc744ceb 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -172,7 +172,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
}
void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
+ GPR_ASSERT(!socket->orphan);
gpr_atm_full_fetch_add(&g_orphans, 1);
+ socket->orphan = 1;
}
static void socket_notify_on_iocp(grpc_winsocket *socket,
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 91268c04e6..fe0196d99c 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -32,11 +32,12 @@
*/
#include <grpc/support/port_platform.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
#ifdef GPR_WINSOCK_SOCKET
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
@@ -64,16 +65,21 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->write_info);
}
-void grpc_winsocket_orphan(grpc_winsocket *socket) {
- grpc_iocp_socket_orphan(socket);
- socket->orphan = 1;
+void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
+ SOCKET socket = winsocket->socket;
+ if (!winsocket->closed_early) {
+ grpc_iocp_socket_orphan(winsocket);
+ }
+ if (winsocket->closed_early) {
+ grpc_winsocket_destroy(winsocket);
+ }
+ closesocket(socket);
grpc_iomgr_unref();
- closesocket(socket->socket);
}
-void grpc_winsocket_destroy(grpc_winsocket *socket) {
- gpr_mu_destroy(&socket->state_mu);
- gpr_free(socket);
+void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
+ gpr_mu_destroy(&winsocket->state_mu);
+ gpr_free(winsocket);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index cbae91692c..ee090668ea 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -64,6 +64,7 @@ typedef struct grpc_winsocket {
int added_to_iocp;
int orphan;
+ int closed_early;
} grpc_winsocket;
/* Create a wrapped windows handle.
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 00c8da601b..653c0c65c5 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -59,6 +59,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
+ int aborted;
} async_connect;
static void async_connect_cleanup(async_connect *ac) {
@@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) {
}
}
-static void on_alarm(void *acp, int success) {
+static void on_alarm(void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- if (ac->socket != NULL && success) {
+ /* If the alarm didn't occor, it got cancelled. */
+ if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket);
}
async_connect_cleanup(ac);
}
-static void on_connect(void *acp, int success) {
+static void on_connect(void *acp, int from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint *ep = NULL;
grpc_winsocket_callback_info *info = &ac->socket->write_info;
void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
+ int aborted;
grpc_alarm_cancel(&ac->alarm);
- if (success) {
+ gpr_mu_lock(&ac->mu);
+ aborted = ac->aborted;
+
+ if (from_iocp) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -107,20 +113,40 @@ static void on_connect(void *acp, int success) {
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
- goto finish;
+ /* If the connection timeouts, we will still get a notification from
+ the IOCP whatever happens. So we're just going to flag that connection
+ as being in the process of being aborted, and wait for the IOCP. We
+ can't just orphan the socket now, because the IOCP might already have
+ gotten a successful connection, which is our worst-case scenario.
+ We need to call our callback now to respect the deadline. */
+ ac->aborted = 1;
+ gpr_mu_unlock(&ac->mu);
+ cb(cb_arg, NULL);
+ return;
}
abort();
finish:
- gpr_mu_lock(&ac->mu);
- if (!ep) {
+ /* If we don't have an endpoint, it means the connection failed,
+ so it doesn't matter if it aborted or failed. We need to orphan
+ that socket. */
+ if (!ep || aborted) {
+ /* If the connection failed, it means we won't get an IOCP notification,
+ so let's flag it as already closed. But if the connection was aborted,
+ while we still got an endpoint, we have to wait for the IOCP to collect
+ that socket. So let's properly flag that. */
+ ac->socket->closed_early = !ep;
grpc_winsocket_orphan(ac->socket);
}
async_connect_cleanup(ac);
- cb(cb_arg, ep);
+ /* If the connection was aborted, the callback was already called when
+ the deadline was met. */
+ if (!aborted) cb(cb_arg, ep);
}
+/* Tries to issue one async connection, then schedules both an IOCP
+ notification request for the connection, and one timeout alert. */
void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
void *arg, const struct sockaddr *addr,
int addr_len, gpr_timespec deadline) {
@@ -156,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
goto failure;
}
+ /* Grab the function pointer for ConnectEx for that specific socket.
+ It may change depending on the interface. */
status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx),
&ioctl_num_bytes, NULL, NULL);
@@ -178,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
info = &socket->write_info;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
+ /* It wouldn't be unusual to get a success immediately. But we'll still get
+ an IOCP notification, so let's ignore it. */
if (!success) {
int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) {
@@ -192,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket;
gpr_mu_init(&ac->mu);
ac->refs = 2;
+ ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_socket_notify_on_write(socket, on_connect, ac);
@@ -202,6 +233,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
+ socket->closed_early = 1;
grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index fe92846a71..c4d3293e83 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -55,11 +55,17 @@
/* one listening port */
typedef struct server_port {
- gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32];
+ /* This seemingly magic number comes from AcceptEx's documentation. each
+ address buffer needs to have at least 16 more bytes at their end. */
+ gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
+ /* This will hold the socket for the next accept. */
SOCKET new_socket;
+ /* The listener winsocked. */
grpc_winsocket *socket;
grpc_tcp_server *server;
+ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
+ int shutting_down;
} server_port;
/* the overall server */
@@ -79,6 +85,8 @@ struct grpc_tcp_server {
size_t port_capacity;
};
+/* Public function. Allocates the proper data structures to hold a
+ grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu);
@@ -92,24 +100,29 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s;
}
+/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_tcp_server *s,
void (*shutdown_done)(void *shutdown_done_arg),
void *shutdown_done_arg) {
size_t i;
gpr_mu_lock(&s->mu);
- /* shutdown all fd's */
+ /* First, shutdown all fd's. This will queue abortion calls for all
+ of the pending accepts. */
for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket);
}
- /* wait while that happens */
+ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
gpr_mu_unlock(&s->mu);
- /* delete ALL the things */
+ /* Now that the accepts have been aborted, we can destroy the sockets.
+ The IOCP won't get notified on these, so we can flag them as already
+ closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
+ sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket);
}
gpr_free(s->ports);
@@ -120,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
}
}
-/* Prepare a recently-created socket for listening. */
+/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
int addr_len) {
struct sockaddr_storage sockname_temp;
@@ -168,8 +181,11 @@ error:
return -1;
}
-static void on_accept(void *arg, int success);
+/* start_accept will reference that for the IOCP notification request. */
+static void on_accept(void *arg, int from_iocp);
+/* In order to do an async accept, we need to create a socket first which
+ will be the one assigned to the new incoming connection. */
static void start_accept(server_port *port) {
SOCKET sock = INVALID_SOCKET;
char *message;
@@ -191,12 +207,13 @@ static void start_accept(server_port *port) {
goto failure;
}
- /* TODO(jtattermusch): probably a race here, we regularly get use-after-free on server shutdown */
- GPR_ASSERT(port->socket != (grpc_winsocket*)0xfeeefeee);
+ /* Start the "accept" asynchronously. */
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped);
+ /* It is possible to get an accept immediately without delay. However, we
+ will still get an IOCP notification for it. So let's just ignore it. */
if (!success) {
int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) {
@@ -205,6 +222,8 @@ static void start_accept(server_port *port) {
}
}
+ /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
+ immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(port->socket, on_accept, port);
return;
@@ -216,14 +235,30 @@ failure:
if (sock != INVALID_SOCKET) closesocket(sock);
}
-/* event manager callback when reads are ready */
-static void on_accept(void *arg, int success) {
+/* Event manager callback when reads are ready. */
+static void on_accept(void *arg, int from_iocp) {
server_port *sp = arg;
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
- if (success) {
+ /* The shutdown sequence is done in two parts. This is the second
+ part here, acknowledging the IOCP notification, and doing nothing
+ else, especially not queuing a new accept. */
+ if (sp->shutting_down) {
+ GPR_ASSERT(from_iocp);
+ sp->shutting_down = 0;
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_cv_broadcast(&sp->server->cv);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+ return;
+ }
+
+ if (from_iocp) {
+ /* The IOCP notified us of a completed operation. Let's grab the results,
+ and act accordingly. */
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -237,16 +272,23 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock));
}
} else {
+ /* If we're not notified from the IOCP, it means we are asked to shutdown.
+ This will initiate that shutdown. Calling closesocket will trigger an
+ IOCP notification, that will call this function a second time, from
+ the IOCP thread. */
+ sp->shutting_down = 1;
+ sp->new_socket = INVALID_SOCKET;
closesocket(sock);
- gpr_mu_lock(&sp->server->mu);
- if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
- }
- gpr_mu_unlock(&sp->server->mu);
}
+ /* The only time we should call our callback, is where we successfully
+ managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
- if (success) {
+ if (from_iocp) {
+ /* As we were notified from the IOCP of one and exactly one accept,
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
start_accept(sp);
}
}
@@ -262,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (sock == INVALID_SOCKET) return -1;
+ /* We need to grab the AcceptEx pointer for that port, as it may be
+ interface-dependent. We'll cache it to avoid doing that again. */
status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
@@ -286,6 +330,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp = &s->ports[s->nports++];
sp->server = s;
sp->socket = grpc_winsocket_create(sock);
+ sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 940cd5bcde..d5b06e7b0b 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -130,6 +130,7 @@ static void on_read(void *tcpp, int success) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
+ socket->closed_early = 1;
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
@@ -225,6 +226,7 @@ static void on_write(void *tcpp, int success) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
+ tcp->socket->closed_early = 1;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
}