aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/iocp_windows.c1
-rw-r--r--src/core/iomgr/iomgr.c39
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c3
-rw-r--r--src/core/iomgr/pollset_posix.c9
-rw-r--r--src/core/iomgr/pollset_windows.c6
-rw-r--r--src/core/iomgr/socket_windows.c35
-rw-r--r--src/core/iomgr/tcp_client_posix.c10
-rw-r--r--src/core/iomgr/tcp_client_windows.c3
-rw-r--r--src/core/iomgr/tcp_server_windows.c80
-rw-r--r--src/core/iomgr/tcp_windows.c24
10 files changed, 112 insertions, 98 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 0c62bfccd5..3d3a193d00 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -127,7 +127,6 @@ static void iocp_loop(void *p) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
}
- gpr_log(GPR_DEBUG, "iocp_loop is done");
gpr_event_set(&g_iocp_done, (void *)1);
}
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index c507e7c26a..cca92d3b32 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -59,7 +59,7 @@ static void background_callback_executor(void *ignored) {
while (!g_shutdown) {
gpr_timespec deadline = gpr_inf_future;
gpr_timespec short_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_millis(100));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
@@ -67,7 +67,8 @@ static void background_callback_executor(void *ignored) {
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
+ } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME),
+ &deadline)) {
} else {
gpr_mu_unlock(&g_mu);
gpr_sleep_until(gpr_time_min(short_deadline, deadline));
@@ -89,7 +90,7 @@ void grpc_iomgr_init(void) {
gpr_thd_id id;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
- grpc_alarm_list_init(gpr_now());
+ grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
@@ -110,21 +111,27 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_object *obj;
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10));
+ gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
- if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG,
- "Waiting for %d iomgr objects to be destroyed and executing "
- "final callbacks",
- count_objects());
- } else if (g_cbs_head != NULL) {
- gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
- } else {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
- count_objects());
+ if (gpr_time_cmp(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
+ gpr_time_from_seconds(1)) >= 0) {
+ if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d iomgr objects to be destroyed and executing "
+ "final callbacks",
+ count_objects());
+ } else if (g_cbs_head != NULL) {
+ gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
+ } else {
+ gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
+ count_objects());
+ }
+ last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (g_cbs_head) {
do {
@@ -145,9 +152,9 @@ void grpc_iomgr_shutdown(void) {
if (g_root_object.next != &g_root_object) {
int timeout = 0;
gpr_timespec short_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_millis(100));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
- if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) {
+ if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
break;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 1900bbf9e1..3746c8edaf 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
+
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 12496440de..85101764d2 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -122,7 +122,7 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@@ -187,15 +187,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) {
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
gpr_timespec timeout;
static const int max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
return -1;
}
if (gpr_time_cmp(
- deadline,
- gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ deadline,
+ gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8d6bc79c96..24226cc980 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
- now = gpr_now();
+ now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
@@ -86,8 +86,6 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) {
- gpr_cv_signal(&p->cv);
-}
+void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index fbf3fdc949..f6ddfff0ad 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr_internal.h"
@@ -45,11 +46,14 @@
#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
+ char *final_name;
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
memset(r, 0, sizeof(grpc_winsocket));
r->socket = socket;
gpr_mu_init(&r->state_mu);
- grpc_iomgr_register_object(&r->iomgr_object, name);
+ gpr_asprintf(&final_name, "%s:socket=0x%p", name, r);
+ grpc_iomgr_register_object(&r->iomgr_object, final_name);
+ gpr_free(final_name);
grpc_iocp_add_socket(r);
return r;
}
@@ -58,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
mutex hold states, and that'd be unsafe to call them directly. */
-int grpc_winsocket_shutdown(grpc_winsocket *socket) {
+int grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
int callbacks_set = 0;
- gpr_mu_lock(&socket->state_mu);
- if (socket->read_info.cb) {
+ SOCKET socket;
+ gpr_mu_lock(&winsocket->state_mu);
+ socket = winsocket->socket;
+ if (winsocket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb,
- socket->read_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->read_info.cb,
+ winsocket->read_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- if (socket->write_info.cb) {
+ if (winsocket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb,
- socket->write_info.opaque);
- grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
+ grpc_iomgr_closure_init(&winsocket->shutdown_closure,
+ winsocket->write_info.cb,
+ winsocket->write_info.opaque);
+ grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0);
}
- gpr_mu_unlock(&socket->state_mu);
+ gpr_mu_unlock(&winsocket->state_mu);
+ closesocket(socket);
return callbacks_set;
}
@@ -84,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
- SOCKET socket = winsocket->socket;
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
} else {
grpc_winsocket_destroy(winsocket);
}
- closesocket(socket);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d981aaf028..dc0489e64f 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -114,6 +114,7 @@ static void on_writable(void *acp, int success) {
void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
+ gpr_mu_lock(&ac->mu);
if (success) {
do {
so_error_size = sizeof(so_error);
@@ -139,6 +140,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
+ gpr_mu_unlock(&ac->mu);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
return;
} else {
@@ -165,10 +167,11 @@ static void on_writable(void *acp, int success) {
abort();
finish:
- gpr_mu_lock(&ac->mu);
- if (!ep) {
+ if (ep == NULL) {
grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
+ } else {
+ ac->fd = NULL;
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
@@ -250,7 +253,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->write_closure.cb_arg = ac;
gpr_mu_lock(&ac->mu);
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
+ gpr_now(GPR_CLOCK_REALTIME));
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index b1a169b519..16741452b9 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -215,7 +215,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
ac->refs = 2;
ac->aborted = 0;
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
+ gpr_now(GPR_CLOCK_REALTIME));
socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d70968de88..e6e1d1499e 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
size_t i;
gpr_mu_lock(&s->mu);
/* First, shutdown all fd's. This will queue abortion calls for all
- of the pending accepts. */
+ of the pending accepts due to the normal operation mechanism. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
+ sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket);
}
/* This happens asynchronously. Wait while that happens. */
@@ -242,63 +243,52 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
-
- /* The shutdown sequence is done in two parts. This is the second
- part here, acknowledging the IOCP notification, and doing nothing
- else, especially not queuing a new accept. */
- if (sp->shutting_down) {
- GPR_ASSERT(from_iocp);
- sp->shutting_down = 0;
- sp->socket->read_info.outstanding = 0;
- gpr_mu_lock(&sp->server->mu);
- if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
- }
- gpr_mu_unlock(&sp->server->mu);
- return;
- }
-
- if (from_iocp) {
- /* The IOCP notified us of a completed operation. Let's grab the results,
- and act accordingly. */
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
- &transfered_bytes, FALSE, &flags);
- if (!wsa_success) {
+ DWORD transfered_bytes;
+ DWORD flags;
+ BOOL wsa_success;
+
+ /* The general mechanism for shutting down is to queue abortion calls. While
+ this is necessary in the read/write case, it's useless for the accept
+ case. Let's do nothing. */
+ if (!from_iocp) return;
+
+ /* The IOCP notified us of a completed operation. Let's grab the results,
+ and act accordingly. */
+ transfered_bytes = 0;
+ wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
+ &transfered_bytes, FALSE, &flags);
+ if (!wsa_success) {
+ if (sp->shutting_down) {
+ /* During the shutdown case, we ARE expecting an error. So that's swell,
+ and we can wake up the shutdown thread. */
+ sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
+ gpr_mu_lock(&sp->server->mu);
+ if (0 == --sp->server->active_ports) {
+ gpr_cv_broadcast(&sp->server->cv);
+ }
+ gpr_mu_unlock(&sp->server->mu);
+ return;
+ } else {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
closesocket(sock);
- } else {
- /* TODO(ctiller): add sockaddr address to label */
- ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
} else {
- /* If we're not notified from the IOCP, it means we are asked to shutdown.
- This will initiate that shutdown. Calling closesocket will trigger an
- IOCP notification, that will call this function a second time, from
- the IOCP thread. Of course, this only works if the socket was, in fact,
- listening. If that's not the case, we'd wait indefinitely. That's a bit
- of a degenerate case, but it can happen if you create a server, but
- don't start it. So let's support that by recursing once. */
- sp->shutting_down = 1;
- sp->new_socket = INVALID_SOCKET;
- if (sock != INVALID_SOCKET) {
- closesocket(sock);
- } else {
- on_accept(sp, 1);
+ if (!sp->shutting_down) {
+ /* TODO(ctiller): add sockaddr address to label */
+ ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
- return;
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
/* As we were notified from the IOCP of one and exactly one accept,
- the former socked we created has now either been destroy or assigned
- to the new connection. We need to create a new one for the next
- connection. */
+ the former socked we created has now either been destroy or assigned
+ to the new connection. We need to create a new one for the next
+ connection. */
start_accept(sp);
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 15759c398a..1bf81a73e0 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -148,9 +148,11 @@ static void on_read(void *tcpp, int from_iocp) {
GPR_ASSERT(tcp->socket->read_info.outstanding);
if (socket->read_info.wsa_error != 0) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
+ if (socket->read_info.wsa_error != WSAECONNRESET) {
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
@@ -259,9 +261,11 @@ static void on_write(void *tcpp, int from_iocp) {
GPR_ASSERT(tcp->socket->write_info.outstanding);
if (info->wsa_error != 0) {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
- gpr_free(utf8_message);
+ if (info->wsa_error != WSAECONNRESET) {
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
status = GRPC_ENDPOINT_CB_ERROR;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
@@ -325,9 +329,11 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
ret = GRPC_ENDPOINT_WRITE_DONE;
GPR_ASSERT(bytes_sent == tcp->write_slices.length);
} else {
- char *utf8_message = gpr_format_message(info->wsa_error);
- gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
- gpr_free(utf8_message);
+ if (socket->read_info.wsa_error != WSAECONNRESET) {
+ char *utf8_message = gpr_format_message(info->wsa_error);
+ gpr_log(GPR_ERROR, "WSASend error: %s", utf8_message);
+ gpr_free(utf8_message);
+ }
}
if (allocated) gpr_free(allocated);
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);