aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-16 08:45:15 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-16 08:45:15 -0700
commite0981dfa05e49acdd9fc5e8abb83a91d84d1a104 (patch)
tree09086d0cbdb4e130b6c66c837eed27d4f8c30bbc /src/core/iomgr
parentb0c13ad7698e577298f199007cc2d7a0a3049d1c (diff)
parentf101af1ab4c553ff2a8cd8f72e8ca156ef188b86 (diff)
Merge github.com:grpc/grpc into i-want-to-wait-free
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/socket_windows.c30
-rw-r--r--src/core/iomgr/tcp_server_windows.c80
2 files changed, 52 insertions, 58 deletions
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 897408ded2..f6ddfff0ad 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr_internal.h"
@@ -61,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
-int grpc_winsocket_shutdown(grpc_winsocket *socket) {
+int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
int callbacks_set = 0;
- gpr_mu_lock(&socket->state_mu);
- if (socket->read_info.cb) {
+ SOCKET socket;
+ gpr_mu_lock(&winsocket->state_mu);
+ socket = winsocket->socket;
+ if (winsocket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb,
- socket->read_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->read_info.cb,
+ winsocket->read_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- if (socket->write_info.cb) {
+ if (winsocket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb,
- socket->write_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->write_info.cb,
+ winsocket->write_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- gpr_mu_unlock(&socket->state_mu);
+ gpr_mu_unlock(&winsocket->state_mu);
+ closesocket(socket);
return callbacks_set;
}
@@ -87,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
- SOCKET socket = winsocket->socket;
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
} else {
grpc_winsocket_destroy(winsocket);
}
- closesocket(socket);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d70968de88..e6e1d1499e 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
size_t i;
gpr_mu_lock(&s->mu);
/* First, shutdown all fd's. This will queue abortion calls for all
- of the pending accepts. */
+ of the pending accepts due to the normal operation mechanism. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
+ sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
@@ -242,63 +243,52 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
-
- /* The shutdown sequence is done in two parts. This is the second
- part here, acknowledging the IOCP notification, and doing nothing
- else, especially not queuing a new accept. */
- if (sp->shutting_down) {
- GPR_ASSERT(from_iocp);
- sp->shutting_down = 0;
- sp->socket->read_info.outstanding = 0;
- gpr_mu_lock(&sp->server->mu);
- if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
- }
- gpr_mu_unlock(&sp->server->mu);
- return;
- }
-
- if (from_iocp) {
- /* The IOCP notified us of a completed operation. Let's grab the results,
- and act accordingly. */
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE, &flags);
- if (!wsa_success) {
+ DWORD transfered_bytes;
+ DWORD flags;
+ BOOL wsa_success;
+
+ /* The general mechanism for shutting down is to queue abortion calls. While
+ this is necessary in the read/write case, it's useless for the accept
+ case. Let's do nothing. */
+ if (!from_iocp) return;
+
+ /* The IOCP notified us of a completed operation. Let's grab the results,
+ and act accordingly. */
+ transfered_bytes = 0;
+ wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
+ &transfered_bytes, FALSE, &flags);
+ if (!wsa_success) {
+ if (sp->shutting_down) {
+ /* During the shutdown case, we ARE expecting an error. So that's swell,
+ and we can wake up the shutdown thread. */
+ sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_cv_broadcast(&sp->server->cv);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+ return;
+ } else {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
closesocket(sock);
- } else {
- /* TODO(ctiller): add sockaddr address to label */
- ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
} else {
- /* If we're not notified from the IOCP, it means we are asked to shutdown.
- This will initiate that shutdown. Calling closesocket will trigger an
- IOCP notification, that will call this function a second time, from
- the IOCP thread. Of course, this only works if the socket was, in fact,
- listening. If that's not the case, we'd wait indefinitely. That's a bit
- of a degenerate case, but it can happen if you create a server, but
- don't start it. So let's support that by recursing once. */
- sp->shutting_down = 1;
- sp->new_socket = INVALID_SOCKET;
- if (sock != INVALID_SOCKET) {
- closesocket(sock);
- } else {
- on_accept(sp, 1);
+ if (!sp->shutting_down) {
+ /* TODO(ctiller): add sockaddr address to label */
+ ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
- return;
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
/* As we were notified from the IOCP of one and exactly one accept,
- the former socked we created has now either been destroy or assigned
- to the new connection. We need to create a new one for the next
- connection. */
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
start_accept(sp);
}