diff options
-rw-r--r-- | src/compiler/cpp_generator.cc | 19 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 24 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 48 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 79 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 2 | ||||
-rw-r--r-- | vsprojects/openssl.props | 13 | ||||
-rw-r--r-- | vsprojects/zlib.props | 13 |
9 files changed, 152 insertions, 49 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1324198847..735e7e58a8 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -828,9 +828,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" - " std::function< ::grpc::Status($ns$$Service$::Service*, " - "::grpc::ServerContext*, const $Request$*, $Response$*)>(" - "&$ns$$Service$::Service::$Method$), this),\n" + " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( @@ -840,10 +838,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::CLIENT_STREAMING,\n" " new ::grpc::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($ns$$Service$::Service*, " - "::grpc::ServerContext*, " - "::grpc::ServerReader< $Request$>*, $Response$*)>(" - "&$ns$$Service$::Service::$Method$), this),\n" + " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( @@ -853,10 +848,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::SERVER_STREAMING,\n" " new ::grpc::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($ns$$Service$::Service*, " - "::grpc::ServerContext*, " - "const $Request$*, ::grpc::ServerWriter< $Response$>*)>(" - "&$ns$$Service$::Service::$Method$), this),\n" + " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -866,10 +858,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::BIDI_STREAMING,\n" " new ::grpc::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($ns$$Service$::Service*, " - "::grpc::ServerContext*, " - "::grpc::ServerReaderWriter< $Response$, $Request$>*)>(" - "&$ns$$Service$::Service::$Method$), this),\n" + " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 7968729353..e7fc744ceb 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -172,7 +172,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { } void grpc_iocp_socket_orphan(grpc_winsocket *socket) { + GPR_ASSERT(!socket->orphan); gpr_atm_full_fetch_add(&g_orphans, 1); + socket->orphan = 1; } static void socket_notify_on_iocp(grpc_winsocket *socket, diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 91268c04e6..fe0196d99c 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -32,11 +32,12 @@ */ #include <grpc/support/port_platform.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> #ifdef GPR_WINSOCK_SOCKET +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr_internal.h" @@ -64,16 +65,21 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) { shutdown_op(&socket->write_info); } -void grpc_winsocket_orphan(grpc_winsocket *socket) { - grpc_iocp_socket_orphan(socket); - socket->orphan = 1; +void grpc_winsocket_orphan(grpc_winsocket *winsocket) { + SOCKET socket = winsocket->socket; + if (!winsocket->closed_early) { + grpc_iocp_socket_orphan(winsocket); + } + if (winsocket->closed_early) { + grpc_winsocket_destroy(winsocket); + } + closesocket(socket); grpc_iomgr_unref(); - closesocket(socket->socket); } -void grpc_winsocket_destroy(grpc_winsocket *socket) { - gpr_mu_destroy(&socket->state_mu); - gpr_free(socket); +void grpc_winsocket_destroy(grpc_winsocket *winsocket) { + gpr_mu_destroy(&winsocket->state_mu); + gpr_free(winsocket); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index cbae91692c..ee090668ea 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -64,6 +64,7 @@ typedef struct grpc_winsocket { int added_to_iocp; int orphan; + int closed_early; } grpc_winsocket; /* Create a wrapped windows handle. diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 00c8da601b..653c0c65c5 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -59,6 +59,7 @@ typedef struct { gpr_timespec deadline; grpc_alarm alarm; int refs; + int aborted; } async_connect; static void async_connect_cleanup(async_connect *ac) { @@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) { } } -static void on_alarm(void *acp, int success) { +static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->socket != NULL && success) { + /* If the alarm didn't occor, it got cancelled. */ + if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } async_connect_cleanup(ac); } -static void on_connect(void *acp, int success) { +static void on_connect(void *acp, int from_iocp) { async_connect *ac = acp; SOCKET sock = ac->socket->socket; grpc_endpoint *ep = NULL; grpc_winsocket_callback_info *info = &ac->socket->write_info; void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + int aborted; grpc_alarm_cancel(&ac->alarm); - if (success) { + gpr_mu_lock(&ac->mu); + aborted = ac->aborted; + + if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, @@ -107,20 +113,40 @@ static void on_connect(void *acp, int success) { } } else { gpr_log(GPR_ERROR, "on_connect is shutting down"); - goto finish; + /* If the connection timeouts, we will still get a notification from + the IOCP whatever happens. So we're just going to flag that connection + as being in the process of being aborted, and wait for the IOCP. We + can't just orphan the socket now, because the IOCP might already have + gotten a successful connection, which is our worst-case scenario. + We need to call our callback now to respect the deadline. */ + ac->aborted = 1; + gpr_mu_unlock(&ac->mu); + cb(cb_arg, NULL); + return; } abort(); finish: - gpr_mu_lock(&ac->mu); - if (!ep) { + /* If we don't have an endpoint, it means the connection failed, + so it doesn't matter if it aborted or failed. We need to orphan + that socket. */ + if (!ep || aborted) { + /* If the connection failed, it means we won't get an IOCP notification, + so let's flag it as already closed. But if the connection was aborted, + while we still got an endpoint, we have to wait for the IOCP to collect + that socket. So let's properly flag that. */ + ac->socket->closed_early = !ep; grpc_winsocket_orphan(ac->socket); } async_connect_cleanup(ac); - cb(cb_arg, ep); + /* If the connection was aborted, the callback was already called when + the deadline was met. */ + if (!aborted) cb(cb_arg, ep); } +/* Tries to issue one async connection, then schedules both an IOCP + notification request for the connection, and one timeout alert. */ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), void *arg, const struct sockaddr *addr, int addr_len, gpr_timespec deadline) { @@ -156,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), goto failure; } + /* Grab the function pointer for ConnectEx for that specific socket. + It may change depending on the interface. */ status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL); @@ -178,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), info = &socket->write_info; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + /* It wouldn't be unusual to get a success immediately. But we'll still get + an IOCP notification, so let's ignore it. */ if (!success) { int error = WSAGetLastError(); if (error != ERROR_IO_PENDING) { @@ -192,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp), ac->socket = socket; gpr_mu_init(&ac->mu); ac->refs = 2; + ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); grpc_socket_notify_on_write(socket, on_connect, ac); @@ -202,6 +233,7 @@ failure: gpr_log(GPR_ERROR, message, utf8_message); gpr_free(utf8_message); if (socket) { + socket->closed_early = 1; grpc_winsocket_orphan(socket); } else if (sock != INVALID_SOCKET) { closesocket(sock); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index fe92846a71..c4d3293e83 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -55,11 +55,17 @@ /* one listening port */ typedef struct server_port { - gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32]; + /* This seemingly magic number comes from AcceptEx's documentation. each + address buffer needs to have at least 16 more bytes at their end. */ + gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; + /* This will hold the socket for the next accept. */ SOCKET new_socket; + /* The listener winsocked. */ grpc_winsocket *socket; grpc_tcp_server *server; + /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; + int shutting_down; } server_port; /* the overall server */ @@ -79,6 +85,8 @@ struct grpc_tcp_server { size_t port_capacity; }; +/* Public function. Allocates the proper data structures to hold a + grpc_tcp_server. */ grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); @@ -92,24 +100,29 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } +/* Public function. Stops and destroys a grpc_tcp_server. */ void grpc_tcp_server_destroy(grpc_tcp_server *s, void (*shutdown_done)(void *shutdown_done_arg), void *shutdown_done_arg) { size_t i; gpr_mu_lock(&s->mu); - /* shutdown all fd's */ + /* First, shutdown all fd's. This will queue abortion calls for all + of the pending accepts. */ for (i = 0; i < s->nports; i++) { grpc_winsocket_shutdown(s->ports[i].socket); } - /* wait while that happens */ + /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); } gpr_mu_unlock(&s->mu); - /* delete ALL the things */ + /* Now that the accepts have been aborted, we can destroy the sockets. + The IOCP won't get notified on these, so we can flag them as already + closed by the system. */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; + sp->socket->closed_early = 1; grpc_winsocket_orphan(sp->socket); } gpr_free(s->ports); @@ -120,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, } } -/* Prepare a recently-created socket for listening. */ +/* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, int addr_len) { struct sockaddr_storage sockname_temp; @@ -168,8 +181,11 @@ error: return -1; } -static void on_accept(void *arg, int success); +/* start_accept will reference that for the IOCP notification request. */ +static void on_accept(void *arg, int from_iocp); +/* In order to do an async accept, we need to create a socket first which + will be the one assigned to the new incoming connection. */ static void start_accept(server_port *port) { SOCKET sock = INVALID_SOCKET; char *message; @@ -191,12 +207,13 @@ static void start_accept(server_port *port) { goto failure; } - /* TODO(jtattermusch): probably a race here, we regularly get use-after-free on server shutdown */ - GPR_ASSERT(port->socket != (grpc_winsocket*)0xfeeefeee); + /* Start the "accept" asynchronously. */ success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, addrlen, addrlen, &bytes_received, &port->socket->read_info.overlapped); + /* It is possible to get an accept immediately without delay. However, we + will still get an IOCP notification for it. So let's just ignore it. */ if (!success) { int error = WSAGetLastError(); if (error != ERROR_IO_PENDING) { @@ -205,6 +222,8 @@ static void start_accept(server_port *port) { } } + /* We're ready to do the accept. Calling grpc_socket_notify_on_read may + immediately process an accept that happened in the meantime. */ port->new_socket = sock; grpc_socket_notify_on_read(port->socket, on_accept, port); return; @@ -216,14 +235,30 @@ failure: if (sock != INVALID_SOCKET) closesocket(sock); } -/* event manager callback when reads are ready */ -static void on_accept(void *arg, int success) { +/* Event manager callback when reads are ready. */ +static void on_accept(void *arg, int from_iocp) { server_port *sp = arg; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; - if (success) { + /* The shutdown sequence is done in two parts. This is the second + part here, acknowledging the IOCP notification, and doing nothing + else, especially not queuing a new accept. */ + if (sp->shutting_down) { + GPR_ASSERT(from_iocp); + sp->shutting_down = 0; + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + return; + } + + if (from_iocp) { + /* The IOCP notified us of a completed operation. Let's grab the results, + and act accordingly. */ DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, @@ -237,16 +272,23 @@ static void on_accept(void *arg, int success) { ep = grpc_tcp_create(grpc_winsocket_create(sock)); } } else { + /* If we're not notified from the IOCP, it means we are asked to shutdown. + This will initiate that shutdown. Calling closesocket will trigger an + IOCP notification, that will call this function a second time, from + the IOCP thread. */ + sp->shutting_down = 1; + sp->new_socket = INVALID_SOCKET; closesocket(sock); - gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); } + /* The only time we should call our callback, is where we successfully + managed to accept a connection, and created an endpoint. */ if (ep) sp->server->cb(sp->server->cb_arg, ep); - if (success) { + if (from_iocp) { + /* As we were notified from the IOCP of one and exactly one accept, + the former socked we created has now either been destroy or assigned + to the new connection. We need to create a new one for the next + connection. */ start_accept(sp); } } @@ -262,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, if (sock == INVALID_SOCKET) return -1; + /* We need to grab the AcceptEx pointer for that port, as it may be + interface-dependent. We'll cache it to avoid doing that again. */ status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); @@ -286,6 +330,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp = &s->ports[s->nports++]; sp->server = s; sp->socket = grpc_winsocket_create(sock); + sp->shutting_down = 0; sp->AcceptEx = AcceptEx; GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 940cd5bcde..d5b06e7b0b 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -130,6 +130,7 @@ static void on_read(void *tcpp, int success) { gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); gpr_free(utf8_message); status = GRPC_ENDPOINT_CB_ERROR; + socket->closed_early = 1; } else { if (info->bytes_transfered != 0) { sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); @@ -225,6 +226,7 @@ static void on_write(void *tcpp, int success) { gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); gpr_free(utf8_message); status = GRPC_ENDPOINT_CB_ERROR; + tcp->socket->closed_early = 1; } else { GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); } diff --git a/vsprojects/openssl.props b/vsprojects/openssl.props new file mode 100644 index 0000000000..94e3ff98ff --- /dev/null +++ b/vsprojects/openssl.props @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ImportGroup Label="PropertySheets" />
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup />
+ <ItemDefinitionGroup>
+ <Link>
+ <AdditionalDependencies>ssleay32.lib;libeay32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ <AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.openssl.1.0.2.2\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup />
+</Project>
\ No newline at end of file diff --git a/vsprojects/zlib.props b/vsprojects/zlib.props new file mode 100644 index 0000000000..ac881a5904 --- /dev/null +++ b/vsprojects/zlib.props @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ImportGroup Label="PropertySheets" /> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup /> + <ItemDefinitionGroup> + <Link> + <AdditionalDependencies>zlib.lib;%(AdditionalDependencies)</AdditionalDependencies> + <AdditionalLibraryDirectories>$(MSBuildProjectDirectory)\..\packages\grpc.dependencies.zlib.1.2.8.9\build\native\lib\$(PlatformToolset)\$(Platform)\$(Configuration)\static\cdecl;%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories> + </Link> + </ItemDefinitionGroup> + <ItemGroup /> +</Project>
\ No newline at end of file |