aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_server_windows.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_windows.c')
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c216
1 files changed, 115 insertions, 101 deletions
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 125f521d87..7b0966704c 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -41,7 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/log_win32.h>
+#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -102,7 +102,9 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
+grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+ const grpc_channel_args *args,
+ grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
@@ -114,12 +116,13 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->shutdown_starting.head = NULL;
s->shutdown_starting.tail = NULL;
s->shutdown_complete = shutdown_complete;
- return s;
+ *server = s;
+ return GRPC_ERROR_NONE;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
}
/* Now that the accepts have been aborted, we can destroy the sockets.
@@ -143,7 +146,8 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
grpc_closure *shutdown_starting) {
gpr_mu_lock(&s->mu);
- grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
@@ -187,51 +191,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
/* Prepare (bind) a recently-created socket for listening. */
-static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
- size_t addr_len) {
+static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr,
+ size_t addr_len, int *port) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
+ grpc_error *error = GRPC_ERROR_NONE;
- if (sock == INVALID_SOCKET) goto error;
-
- if (!grpc_tcp_prepare_socket(sock)) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "Unable to prepare socket: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = grpc_tcp_prepare_socket(sock);
+ if (error != GRPC_ERROR_NONE) {
+ goto failure;
}
if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) {
- char *addr_str;
- char *utf8_message = gpr_format_message(WSAGetLastError());
- grpc_sockaddr_to_string(&addr_str, addr, 0);
- gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, utf8_message);
- gpr_free(utf8_message);
- gpr_free(addr_str);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
+ goto failure;
}
if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "listen: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
+ goto failure;
}
sockname_len = sizeof(sockname_temp);
if (getsockname(sock, (struct sockaddr *)&sockname_temp, &sockname_len) ==
SOCKET_ERROR) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "getsockname: %s", utf8_message);
- gpr_free(utf8_message);
- goto error;
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
+ goto failure;
}
- return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ return GRPC_ERROR_NONE;
-error:
+failure:
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ char *tgtaddr = grpc_sockaddr_to_uri(addr);
+ grpc_error *final_error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to prepare server socket", &error, 1),
+ GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr),
+ GRPC_ERROR_INT_FD, (intptr_t)sock);
+ gpr_free(tgtaddr);
+ GRPC_ERROR_UNREF(error);
if (sock != INVALID_SOCKET) closesocket(sock);
- return -1;
+ return error;
}
static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
@@ -251,26 +253,23 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
+static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
- char *message;
- char *utf8_message;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
-
if (sock == INVALID_SOCKET) {
- message = "Unable to create socket: %s";
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
goto failure;
}
- if (!grpc_tcp_prepare_socket(sock)) {
- message = "Unable to prepare socket: %s";
- goto failure;
- }
+ error = grpc_tcp_prepare_socket(sock);
+ if (error != GRPC_ERROR_NONE) goto failure;
/* Start the "accept" asynchronously. */
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
@@ -280,9 +279,9 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it. So let's just ignore it. */
if (!success) {
- int error = WSAGetLastError();
- if (error != ERROR_IO_PENDING) {
- message = "AcceptEx failed: %s";
+ int last_error = WSAGetLastError();
+ if (last_error != ERROR_IO_PENDING) {
+ error = GRPC_WSA_ERROR(last_error, "AcceptEx");
goto failure;
}
}
@@ -291,9 +290,10 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
- return;
+ return error;
failure:
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
if (port->shutting_down) {
/* We are abandoning the listener port, take that into account to prevent
occasional hangs on shutdown. The hang happens when sp->shutting_down
@@ -301,16 +301,15 @@ failure:
but we fail there because the listening port has been closed in the
meantime. */
decrement_active_ports_and_notify(exec_ctx, port);
- return;
+ GRPC_ERROR_UNREF(error);
+ return GRPC_ERROR_NONE;
}
- utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, message, utf8_message);
- gpr_free(utf8_message);
if (sock != INVALID_SOCKET) closesocket(sock);
+ return error;
}
/* Event manager callback when reads are ready. */
-static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
@@ -328,7 +327,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
/* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read/write case, it's useless for the accept
case. We only need to adjust the pending callback count */
- if (!from_iocp) {
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
+ grpc_error_free_string(msg);
return;
}
@@ -379,28 +381,28 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
- if (ep)
- sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
+ if (ep) {
+ sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor);
+ }
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- start_accept(exec_ctx, sp);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
}
-static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
- const struct sockaddr *addr,
- size_t addr_len,
- unsigned port_index) {
+static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
+ const struct sockaddr *addr,
+ size_t addr_len, unsigned port_index,
+ grpc_tcp_listener **listener) {
grpc_tcp_listener *sp = NULL;
- int port;
+ int port = -1;
int status;
GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx;
-
- if (sock == INVALID_SOCKET) return NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
@@ -416,44 +418,49 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return NULL;
}
- port = prepare_socket(sock, addr, addr_len);
- if (port >= 0) {
- gpr_mu_lock(&s->mu);
- GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = NULL;
- if (s->head == NULL) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->socket = grpc_winsocket_create(sock, "listener");
- sp->shutting_down = 0;
- sp->AcceptEx = AcceptEx;
- sp->new_socket = INVALID_SOCKET;
- sp->port = port;
- sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp);
- GPR_ASSERT(sp->socket);
- gpr_mu_unlock(&s->mu);
+ error = prepare_socket(sock, addr, addr_len, &port);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+
+ GPR_ASSERT(port >= 0);
+ gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
}
+ s->tail = sp;
+ sp->server = s;
+ sp->socket = grpc_winsocket_create(sock, "listener");
+ sp->shutting_down = 0;
+ sp->AcceptEx = AcceptEx;
+ sp->new_socket = INVALID_SOCKET;
+ sp->port = port;
+ sp->port_index = port_index;
+ grpc_closure_init(&sp->on_accept, on_accept, sp);
+ GPR_ASSERT(sp->socket);
+ gpr_mu_unlock(&s->mu);
+ *listener = sp;
- return sp;
+ return GRPC_ERROR_NONE;
}
-int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- size_t addr_len) {
- grpc_tcp_listener *sp;
+grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len, int *port) {
+ grpc_tcp_listener *sp = NULL;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard;
struct sockaddr *allocated_addr = NULL;
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
- int port;
unsigned port_index = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
+
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
@@ -465,11 +472,11 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sockname_len = sizeof(sockname_temp);
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)&sockname_temp, &sockname_len)) {
- port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
- if (port > 0) {
+ *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
+ if (*port > 0) {
allocated_addr = gpr_malloc(addr_len);
memcpy(allocated_addr, addr, addr_len);
- grpc_sockaddr_set_port(allocated_addr, port);
+ grpc_sockaddr_set_port(allocated_addr, *port);
addr = allocated_addr;
break;
}
@@ -483,8 +490,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
- if (grpc_sockaddr_is_wildcard(addr, &port)) {
- grpc_sockaddr_make_wildcard6(port, &wildcard);
+ if (grpc_sockaddr_is_wildcard(addr, port)) {
+ grpc_sockaddr_make_wildcard6(*port, &wildcard);
addr = (struct sockaddr *)&wildcard;
addr_len = sizeof(wildcard);
@@ -493,19 +500,26 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "unable to create socket: %s", utf8_message);
- gpr_free(utf8_message);
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
+ goto done;
}
- sp = add_socket_to_server(s, sock, addr, addr_len, port_index);
+ error = add_socket_to_server(s, sock, addr, addr_len, port_index, &sp);
+
+done:
gpr_free(allocated_addr);
- if (sp) {
- return sp->port;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING(
+ "Failed to add port to server", &error, 1);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ *port = -1;
} else {
- return -1;
+ GPR_ASSERT(sp != NULL);
+ *port = sp->port;
}
+ return error;
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
@@ -520,7 +534,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- start_accept(exec_ctx, sp);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);