aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/fd_posix.c34
-rw-r--r--src/core/iomgr/iomgr.h4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c3
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c8
-rw-r--r--src/core/iomgr/pollset_posix.c16
-rw-r--r--src/core/iomgr/pollset_posix.h4
-rw-r--r--src/core/iomgr/pollset_windows.c14
-rw-r--r--src/core/iomgr/pollset_windows.h1
8 files changed, 59 insertions, 25 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 347d8793c8..d12974cf3c 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -100,6 +100,7 @@ static grpc_fd *alloc_fd(int fd) {
&r->inactive_watcher_root;
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
return r;
}
@@ -138,9 +139,6 @@ static void unref_by(grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- if (fd->on_done_closure) {
- grpc_iomgr_add_callback(fd->on_done_closure);
- }
grpc_iomgr_unregister_object(&fd->iomgr_object);
freelist_fd(fd);
} else {
@@ -199,13 +197,24 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
}
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
- wake_all_watchers_locked(fd);
+ if (!has_watchers(fd)) {
+ close(fd->fd);
+ if (fd->on_done_closure) {
+ grpc_iomgr_add_callback(fd->on_done_closure);
+ }
+ } else {
+ wake_all_watchers_locked(fd);
+ }
gpr_mu_unlock(&fd->watcher_mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@@ -354,6 +363,13 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->watcher_mu);
+ /* if we are shutdown, then don't add to the watcher set */
+ if (gpr_atm_no_barrier_load(&fd->shutdown)) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ gpr_mu_unlock(&fd->watcher_mu);
+ return 0;
+ }
/* if there is nobody polling for read, but we need to, then start doing so */
if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
@@ -383,6 +399,10 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
int kick = 0;
grpc_fd *fd = watcher->fd;
+ if (fd == NULL) {
+ return;
+ }
+
gpr_mu_lock(&fd->watcher_mu);
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
@@ -404,6 +424,12 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
+ if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) {
+ close(fd->fd);
+ if (fd->on_done_closure != NULL) {
+ grpc_iomgr_add_callback(fd->on_done_closure);
+ }
+ }
gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll");
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index a10e481e48..6d4a82917b 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -73,4 +73,8 @@ void grpc_iomgr_shutdown(void);
* Can be called from within a callback or from anywhere else */
void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
+/** As per grpc_iomgr_add_callback, with the ability to set the success
+ argument. */
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index b4a526b9e7..dcf08d379c 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -83,7 +83,7 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static int multipoll_with_epoll_pollset_maybe_work(
+static void multipoll_with_epoll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
@@ -133,7 +133,6 @@ static int multipoll_with_epoll_pollset_maybe_work(
gpr_mu_lock(&pollset->mu);
pollset->counter -= 1;
- return 1;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 2f108da66a..cc062693a9 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -103,7 +103,7 @@ static void end_polling(grpc_pollset *pollset) {
}
}
-static int multipoll_with_poll_pollset_maybe_work(
+static void multipoll_with_poll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
int timeout;
@@ -126,7 +126,7 @@ static int multipoll_with_poll_pollset_maybe_work(
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
- return 1;
+ return;
}
h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
h->pfds[0].events = POLLIN;
@@ -154,7 +154,7 @@ static int multipoll_with_poll_pollset_maybe_work(
h->del_count = 0;
if (h->pfd_count == 0) {
end_polling(pollset);
- return 0;
+ return;
}
pollset->counter++;
gpr_mu_unlock(&pollset->mu);
@@ -191,8 +191,6 @@ static int multipoll_with_poll_pollset_maybe_work(
gpr_mu_lock(&pollset->mu);
pollset->counter--;
-
- return 1;
}
static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 46d3d132ce..15ed8e75e6 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -123,7 +123,6 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now();
- int r;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@@ -137,7 +136,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
return 1;
}
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
+ pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
if (pollset->counter > 0) {
@@ -153,7 +152,7 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_mu_lock(&pollset->mu);
}
}
- return r;
+ return 1;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -338,9 +337,9 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
}
-static int basic_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+static void basic_pollset_maybe_work(grpc_pollset *pollset,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@@ -353,7 +352,7 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
gpr_mu_lock(&pollset->mu);
- return 1;
+ return;
}
fd = pollset->data.ptr;
if (fd && grpc_fd_is_orphaned(fd)) {
@@ -364,7 +363,7 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
- return 1;
+ return;
}
pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
pfd[0].events = POLLIN;
@@ -418,7 +417,6 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
pollset->counter--;
- return 1;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index ba3d638d41..53585a2886 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -68,8 +68,8 @@ typedef struct grpc_pollset {
struct grpc_pollset_vtable {
void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
+ void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback);
void (*kick)(grpc_pollset *pollset);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index d0507af960..8d6bc79c96 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -39,6 +39,7 @@
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
/* There isn't really any such thing as a pollset under Windows, due to the
@@ -47,6 +48,7 @@
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) {
+ memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
}
@@ -54,6 +56,10 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
+ gpr_mu_lock(&pollset->mu);
+ pollset->shutting_down = 1;
+ gpr_cv_broadcast(&pollset->cv);
+ gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
@@ -68,13 +74,15 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
- if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) {
+ if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
return 1 /* GPR_TRUE */;
}
- if (grpc_alarm_check(NULL, now, &deadline)) {
+ if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1 /* GPR_TRUE */;
}
- gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ if (!pollset->shutting_down) {
+ gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ }
return 1 /* GPR_TRUE */;
}
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index b4aec1b809..57a2907926 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -46,6 +46,7 @@
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
+ int shutting_down;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)