aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:41:12 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:41:12 -0700
commitc9fef6f521d92ff0b89291c5546b15448444e246 (patch)
tree610a1a63a5ca9e22713b336d51865a99232d3fe6 /src/core/iomgr
parenta2c622e25338896cf7658f0803d308f468a549f9 (diff)
parentfea28b79fb0a211b2ea8515dab25db19099ea595 (diff)
Merge github.com:grpc/grpc into no-worries-i-can-wait
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/pollset_set.h2
-rw-r--r--src/core/iomgr/socket_windows.c30
-rw-r--r--src/core/iomgr/tcp_server_windows.c80
3 files changed, 53 insertions, 59 deletions
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 98e3b552a7..6d73951c70 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -38,7 +38,7 @@
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
- fd's (etc) that have been registered with the set_set with that pollset.
+ fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 897408ded2..f6ddfff0ad 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr_internal.h"
@@ -61,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
-int grpc_winsocket_shutdown(grpc_winsocket *socket) {
+int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
int callbacks_set = 0;
- gpr_mu_lock(&socket->state_mu);
- if (socket->read_info.cb) {
+ SOCKET socket;
+ gpr_mu_lock(&winsocket->state_mu);
+ socket = winsocket->socket;
+ if (winsocket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb,
- socket->read_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->read_info.cb,
+ winsocket->read_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- if (socket->write_info.cb) {
+ if (winsocket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb,
- socket->write_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->write_info.cb,
+ winsocket->write_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- gpr_mu_unlock(&socket->state_mu);
+ gpr_mu_unlock(&winsocket->state_mu);
+ closesocket(socket);
return callbacks_set;
}
@@ -87,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
- SOCKET socket = winsocket->socket;
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
} else {
grpc_winsocket_destroy(winsocket);
}
- closesocket(socket);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 19ded3e8bf..187009b2c8 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
size_t i;
gpr_mu_lock(&s->mu);
/* First, shutdown all fd's. This will queue abortion calls for all
- of the pending accepts. */
+ of the pending accepts due to the normal operation mechanism. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
+ sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
@@ -242,63 +243,52 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
-
- /* The shutdown sequence is done in two parts. This is the second
- part here, acknowledging the IOCP notification, and doing nothing
- else, especially not queuing a new accept. */
- if (sp->shutting_down) {
- GPR_ASSERT(from_iocp);
- sp->shutting_down = 0;
- sp->socket->read_info.outstanding = 0;
- gpr_mu_lock(&sp->server->mu);
- if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
- }
- gpr_mu_unlock(&sp->server->mu);
- return;
- }
-
- if (from_iocp) {
- /* The IOCP notified us of a completed operation. Let's grab the results,
- and act accordingly. */
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE, &flags);
- if (!wsa_success) {
+ DWORD transfered_bytes;
+ DWORD flags;
+ BOOL wsa_success;
+
+ /* The general mechanism for shutting down is to queue abortion calls. While
+ this is necessary in the read/write case, it's useless for the accept
+ case. Let's do nothing. */
+ if (!from_iocp) return;
+
+ /* The IOCP notified us of a completed operation. Let's grab the results,
+ and act accordingly. */
+ transfered_bytes = 0;
+ wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
+ &transfered_bytes, FALSE, &flags);
+ if (!wsa_success) {
+ if (sp->shutting_down) {
+ /* During the shutdown case, we ARE expecting an error. So that's swell,
+ and we can wake up the shutdown thread. */
+ sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_cv_broadcast(&sp->server->cv);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+ return;
+ } else {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
closesocket(sock);
- } else {
- /* TODO(ctiller): add sockaddr address to label */
- ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
} else {
- /* If we're not notified from the IOCP, it means we are asked to shutdown.
- This will initiate that shutdown. Calling closesocket will trigger an
- IOCP notification, that will call this function a second time, from
- the IOCP thread. Of course, this only works if the socket was, in fact,
- listening. If that's not the case, we'd wait indefinitely. That's a bit
- of a degenerate case, but it can happen if you create a server, but
- don't start it. So let's support that by recursing once. */
- sp->shutting_down = 1;
- sp->new_socket = INVALID_SOCKET;
- if (sock != INVALID_SOCKET) {
- closesocket(sock);
- } else {
- on_accept(sp, 1);
+ if (!sp->shutting_down) {
+ /* TODO(ctiller): add sockaddr address to label */
+ ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
- return;
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
/* As we were notified from the IOCP of one and exactly one accept,
- the former socked we created has now either been destroy or assigned
- to the new connection. We need to create a new one for the next
- connection. */
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
start_accept(sp);
}