diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-02-05 19:40:38 +0100 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-02-05 19:40:38 +0100 |
commit | 0f3ec822380081670edd0e9e1d7e40c1122bda21 (patch) | |
tree | cffd831fd40d78204a8f6f78d87f7b528b189689 /src | |
parent | ba410fabdf476ba61fef22a1f3205bc6d8f96a4b (diff) |
Adding Windows tcp server code.
Diffstat (limited to 'src')
-rw-r--r-- | src/core/iomgr/sockaddr_utils.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_utils.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 372 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 5 |
5 files changed, 396 insertions, 14 deletions
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 07bf7b3a35..8dcfca74c6 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -111,13 +111,20 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) { void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, struct sockaddr_in6 *wild6_out) { - memset(wild4_out, 0, sizeof(*wild4_out)); - wild4_out->sin_family = AF_INET; - wild4_out->sin_port = htons(port); + grpc_sockaddr_make_wildcard4(port, wild4_out); + grpc_sockaddr_make_wildcard6(port, wild6_out); +} + +void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out) { + memset(wild_out, 0, sizeof(*wild_out)); + wild_out->sin_family = AF_INET; + wild_out->sin_port = htons(port); +} - memset(wild6_out, 0, sizeof(*wild6_out)); - wild6_out->sin6_family = AF_INET6; - wild6_out->sin6_port = htons(port); +void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out) { + memset(wild_out, 0, sizeof(*wild_out)); + wild_out->sin6_family = AF_INET6; + wild_out->sin6_port = htons(port); } int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index 3f5b770e86..b49cc50491 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -57,6 +57,12 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, struct sockaddr_in6 *wild6_out); +/* Writes 0.0.0.0:port. */ +void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out); + +/* Writes [::]:port. */ +void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out); + /* Return the IP port number of a sockaddr */ int grpc_sockaddr_get_port(const struct sockaddr *addr); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 465fc75295..37e6b12552 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -106,9 +106,7 @@ static void on_connect(void *acp, int success) { goto finish; } } else { - __debugbreak(); - abort(); - gpr_log(GPR_ERROR, "on_writable failed during connect"); + gpr_log(GPR_ERROR, "on_connect is shutting down"); goto finish; } @@ -167,9 +165,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), goto failure; } - memset(&local_address, 0, sizeof(local_address)); - memcpy(&local_address.sin6_addr, &in6addr_any, sizeof(in6addr_any)); - local_address.sin6_family = AF_INET6; + grpc_sockaddr_make_wildcard6(0, &local_address); status = bind(sock, (struct sockaddr *) &local_address, sizeof(local_address)); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c new file mode 100644 index 0000000000..21901958d1 --- /dev/null +++ b/src/core/iomgr/tcp_server_windows.c @@ -0,0 +1,372 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WINSOCK_SOCKET + +#define _GNU_SOURCE +#include "src/core/iomgr/sockaddr_utils.h" + +#include "src/core/iomgr/pollset_windows.h" +#include "src/core/iomgr/socket_windows.h" +#include "src/core/iomgr/tcp_server.h" +#include "src/core/iomgr/tcp_windows.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/log_win32.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* one listening port */ +typedef struct server_port { + gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; + SOCKET new_socket; + grpc_winsocket *socket; + grpc_tcp_server *server; + LPFN_ACCEPTEX AcceptEx; +} server_port; + +/* the overall server */ +struct grpc_tcp_server { + grpc_tcp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + int active_ports; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; +}; + +grpc_tcp_server *grpc_tcp_server_create(void) { + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + return s; +} + +void grpc_tcp_server_destroy(grpc_tcp_server *s) { + size_t i; + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + for (i = 0; i < s->nports; i++) { + grpc_winsocket_shutdown(s->ports[i].socket); + } + /* wait while that happens */ + while (s->active_ports) { + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + } + gpr_mu_unlock(&s->mu); + + /* delete ALL the things */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_winsocket_orphan(sp->socket); + } + gpr_free(s->ports); + gpr_free(s); +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(SOCKET sock, + const struct sockaddr *addr, int addr_len) { + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + + if (sock == INVALID_SOCKET) goto error; + + if (!grpc_tcp_prepare_socket(sock)) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "Unable to prepare socket: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + if (bind(sock, addr, addr_len) == SOCKET_ERROR) { + char *addr_str; + char *utf8_message = gpr_format_message(WSAGetLastError()); + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, utf8_message); + gpr_free(utf8_message); + gpr_free(addr_str); + goto error; + } + + if (listen(sock, SOMAXCONN) == SOCKET_ERROR) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "listen: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + sockname_len = sizeof(sockname_temp); + if (getsockname(sock, (struct sockaddr *) &sockname_temp, &sockname_len) + == SOCKET_ERROR) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "getsockname: %s", utf8_message); + gpr_free(utf8_message); + goto error; + } + + return grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + +error: + if (sock != INVALID_SOCKET) closesocket(sock); + return -1; +} + +static void on_accept(void *arg, int success); + +static void start_accept(server_port *port) { + SOCKET sock = INVALID_SOCKET; + char *message; + char *utf8_message; + BOOL success; + DWORD addrlen = sizeof(struct sockaddr_in6) + 16; + DWORD bytes_received = 0; + + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); + + if (sock == INVALID_SOCKET) { + message = "Unable to create socket: %s"; + goto failure; + } + + if (!grpc_tcp_prepare_socket(sock)) { + message = "Unable to prepare socket: %s"; + goto failure; + } + + success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, + addrlen, addrlen, &bytes_received, + &port->socket->read_info.overlapped); + + if (success) { + gpr_log(GPR_DEBUG, "accepted immediately - but we still go to sleep"); + } else { + int error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) { + message = "AcceptEx failed: %s"; + goto failure; + } + } + + port->new_socket = sock; + grpc_handle_notify_on_read(port->socket, on_accept, port); + return; + +failure: + utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, message, utf8_message); + gpr_free(utf8_message); + if (sock != INVALID_SOCKET) closesocket(sock); +} + +/* event manager callback when reads are ready */ +static void on_accept(void *arg, int success) { + server_port *sp = arg; + SOCKET sock = sp->new_socket; + grpc_winsocket_callback_info *info = &sp->socket->read_info; + grpc_endpoint *ep = NULL; + + if (success) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + &transfered_bytes, FALSE, + &flags); + if (!wsa_success) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); + gpr_free(utf8_message); + closesocket(sock); + } else { + gpr_log(GPR_DEBUG, "on_accept: accepted connection"); + ep = grpc_tcp_create(grpc_winsocket_create(sock)); + } + } else { + gpr_log(GPR_DEBUG, "on_accept: shutting down"); + closesocket(sock); + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + } + + if (ep) sp->server->cb(sp->server->cb_arg, ep); + start_accept(sp); +} + +static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, + const struct sockaddr *addr, int addr_len) { + server_port *sp; + int port; + int status; + GUID guid = WSAID_ACCEPTEX; + DWORD ioctl_num_bytes; + LPFN_ACCEPTEX AcceptEx; + + if (sock == INVALID_SOCKET) return -1; + + status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid, sizeof(guid), &AcceptEx, sizeof(AcceptEx), + &ioctl_num_bytes, NULL, NULL); + + if (status != 0) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); + gpr_free(utf8_message); + closesocket(sock); + return -1; + } + + port = prepare_socket(sock, addr, addr_len); + if (port >= 0) { + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->server = s; + sp->socket = grpc_winsocket_create(sock); + sp->AcceptEx = AcceptEx; + GPR_ASSERT(sp->socket); + gpr_mu_unlock(&s->mu); + } + + return port; +} + +int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, + int addr_len) { + int allocated_port = -1; + unsigned i; + SOCKET sock; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in6 wildcard; + struct sockaddr *allocated_addr = NULL; + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int port; + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (i = 0; i < s->nports; i++) { + sockname_len = sizeof(sockname_temp); + if (0 == getsockname(s->ports[i].socket->socket, + (struct sockaddr *) &sockname_temp, + &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + if (port > 0) { + allocated_addr = malloc(addr_len); + memcpy(allocated_addr, addr, addr_len); + grpc_sockaddr_set_port(allocated_addr, port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcard6(port, &wildcard); + + addr = (struct sockaddr *) &wildcard; + addr_len = sizeof(wildcard); + } + + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, + WSA_FLAG_OVERLAPPED); + if (sock == INVALID_SOCKET) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "unable to create socket: %s", utf8_message); + gpr_free(utf8_message); + } + + allocated_port = add_socket_to_server(s, sock, addr, addr_len); + gpr_free(allocated_addr); + + return allocated_port; +} + +SOCKET grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { + return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; +} + +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg) { + size_t i; + GPR_ASSERT(cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = cb; + s->cb_arg = cb_arg; + for (i = 0; i < s->nports; i++) { + start_accept(s->ports + i); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} + +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 734628dd8e..bd0b2dd869 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -61,7 +61,8 @@ static int set_non_block(SOCKET sock) { static int set_dualstack(SOCKET sock) { int status; unsigned long param = 0; - status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, ¶m, sizeof(param)); + status = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, + (const char *) ¶m, sizeof(param)); return status == 0; } @@ -120,7 +121,7 @@ static void on_read(void *tcpp, int success) { if (!success) { tcp_unref(tcp); - cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); return; } |