aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-10-01 07:53:56 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-10-01 07:53:56 -0700
commit988e37f1fc8542c205db569be0dd20f758c39164 (patch)
tree6278c00bdc22fb0c565c15c867ed5adce199917c /src/core
parent2b2a1ad6ca97da100fd469085f0ffef847e87e65 (diff)
Allow fd_posix to force a re-evaluation of polling on wakeup
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/fd_posix.c20
-rw-r--r--src/core/iomgr/fd_posix.h2
-rw-r--r--src/core/iomgr/pollset_posix.c30
-rw-r--r--src/core/iomgr/pollset_posix.h5
4 files changed, 46 insertions, 11 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b48b7f050a..806af9bc26 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -173,19 +173,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void pollset_kick_locked(grpc_pollset *pollset) {
- gpr_mu_lock(GRPC_POLLSET_MU(pollset));
- grpc_pollset_kick(pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+ grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next);
} else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
} else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
@@ -199,13 +199,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher->pollset);
+ pollset_kick_locked(watcher);
}
if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 089aa4d717..a60aff2a09 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -46,6 +46,7 @@ typedef struct grpc_fd_watcher {
struct grpc_fd_watcher *next;
struct grpc_fd_watcher *prev;
grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
grpc_fd *fd;
} grpc_fd_watcher;
@@ -126,6 +127,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
fd's current interest (such as epoll) do not need to call this function.
MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 82a82cc064..c88fff6b43 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -98,31 +98,59 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
-void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) {
/* pollset->mu already held */
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
}
p->kicked_without_pollers = 1;
+ return;
} else if (gpr_tls_get(&g_current_thread_worker) !=
(gpr_intptr)specific_worker) {
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
}
} else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) {
+ push_back_worker(p, specific_worker);
+ return;
+ }
+ }
push_back_worker(p, specific_worker);
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ return;
} else {
p->kicked_without_pollers = 1;
+ return;
}
}
}
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ grpc_pollset_kick_ex(p, specific_worker, 0);
+}
+
/* global state management */
void grpc_pollset_global_init(void) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 83c5258539..762582c79d 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -50,6 +50,7 @@ struct grpc_fd;
typedef struct grpc_pollset_worker {
grpc_wakeup_fd wakeup_fd;
+ int reevaluate_polling_on_wakeup;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
@@ -111,6 +112,10 @@ void grpc_kick_drain(grpc_pollset *p);
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now);
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags);
+
/* turn a pollset into a multipoller: platform specific */
typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,