aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-05-31 10:58:17 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-06-01 08:52:22 -0700
commit5098f91159f2a5c0494688b8cfaff4debef5686f (patch)
tree979cc67ea482ed6232b2760ba7e430f7d27b1f42 /src/core/lib/iomgr
parent8c6c9067bc001a7cd94c2689570a6ec27affee82 (diff)
Rewrite all the pollset and fd functions in ev_epoll_linux.c
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c161
1 files changed, 38 insertions, 123 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 7793a95201..1201c10a7e 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -67,10 +67,10 @@ struct polling_island;
struct grpc_fd {
int fd;
/* refst format:
- bit0: 1=active/0=orphaned
- bit1-n: refcount
- meaning that mostly we ref by two to avoid altering the orphaned bit,
- and just unref by 1 when we're ready to flag the object as orphaned */
+ bit 0 : 1=Active / 0=Orphaned
+ bits 1-n : refcount
+ - ref/unref by two to avoid altering the orphaned bit
+ - To orphan, unref by 1 */
gpr_atm refst;
gpr_mu mu;
@@ -84,12 +84,11 @@ struct grpc_fd {
/* Mutex protecting the 'polling_island' field */
gpr_mu pi_mu;
- /* The polling island to which this fd belongs to. An fd belongs to exactly
- one polling island */
+ /* The polling island to which this fd belongs to.
+ * An fd belongs to exactly one polling island */
struct polling_island *polling_island;
struct grpc_fd *freelist_next;
-
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
@@ -141,7 +140,6 @@ typedef struct polling_island {
/* Polling islands that are no longer needed are kept in a freelist so that
they can be reused. This field points to the next polling island in the
- free list. Note that this is only used if the polling island is in the
free list */
struct polling_island *next_free;
} polling_island;
@@ -185,7 +183,7 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
* TODO: sreek - Might have to unref the fds (assuming whether we add a ref to
* the fd when adding it to the epollset) */
/* The caller is expected to hold pi->mu lock before calling this function */
-static void polling_island_clear_fds_locked(polling_island *pi) {
+static void polling_island_remove_all_fds_locked(polling_island *pi) {
int err;
size_t i;
@@ -392,7 +390,7 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
// Move all the fds from polling_island p to polling_island q
polling_island_add_fds_locked(q, p->fds, p->fd_cnt);
- polling_island_clear_fds_locked(p);
+ polling_island_remove_all_fds_locked(p);
q->ref_cnt += p->ref_cnt;
@@ -411,14 +409,7 @@ static void polling_island_global_init() {
* pollset declarations
*/
-typedef struct grpc_cached_wakeup_fd {
- grpc_wakeup_fd fd;
- struct grpc_cached_wakeup_fd *next;
-} grpc_cached_wakeup_fd;
-
struct grpc_pollset_worker {
- grpc_cached_wakeup_fd *wakeup_fd;
- int reevaluate_polling_on_wakeup;
int kicked_specifically;
pthread_t pt_id;
struct grpc_pollset_worker *next;
@@ -441,9 +432,6 @@ struct grpc_pollset {
/* The polling island to which this fd belongs to. An fd belongs to exactly
one polling island */
struct polling_island *polling_island;
-
- /* Local cache of eventfds for workers */
- grpc_cached_wakeup_fd *local_wakeup_cache;
};
/* Add an fd to a pollset */
@@ -465,8 +453,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
/* Allow kick to wakeup the currently polling worker */
#define GRPC_POLLSET_CAN_KICK_SELF 1
-/* Force the wakee to repoll when awoken */
-#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
/* As per pollset_kick, with an extended set of flags (defined above)
-- mostly for fd_posix's use. */
static void pollset_kick_ext(grpc_pollset *p,
@@ -815,34 +801,25 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
- GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ pthread_kill(specific_worker->pt_id, SIGUSR1);
}
p->kicked_without_pollers = 1;
GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
} else if (gpr_tls_get(&g_current_thread_worker) !=
(intptr_t)specific_worker) {
GPR_TIMER_MARK("different_thread_worker", 0);
- if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = 1;
- }
specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
/* TODO (sreek): Refactor this into a separate file*/
pthread_kill(specific_worker->pt_id, SIGUSR1);
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0);
- if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = 1;
- }
specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ pthread_kill(specific_worker->pt_id, SIGUSR1);
}
} else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
- GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
GPR_TIMER_MARK("kick_anonymous", 0);
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
@@ -860,7 +837,7 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, specific_worker);
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ pthread_kill(specific_worker->pt_id, SIGUSR1);
}
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
@@ -911,8 +888,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
- pollset->local_wakeup_cache = NULL;
- pollset->kicked_without_pollers = 0;
multipoll_with_epoll_pollset_create_efd(pollset);
}
@@ -926,12 +901,6 @@ static void pollset_destroy(grpc_pollset *pollset) {
multipoll_with_epoll_pollset_destroy(pollset);
- while (pollset->local_wakeup_cache) {
- grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
- grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
- gpr_free(pollset->local_wakeup_cache);
- pollset->local_wakeup_cache = next;
- }
gpr_mu_destroy(&pollset->pi_mu);
gpr_mu_destroy(&pollset->mu);
}
@@ -974,14 +943,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_BEGIN("pollset_work", 0);
/* this must happen before we (potentially) drop pollset->mu */
worker.next = worker.prev = NULL;
- worker.reevaluate_polling_on_wakeup = 0;
- if (pollset->local_wakeup_cache != NULL) {
- worker.wakeup_fd = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker.wakeup_fd->next;
- } else {
- worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
- grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
- }
worker.kicked_specifically = 0;
/* TODO(sreek): Abstract this thread id stuff out into a separate file */
@@ -1026,27 +987,12 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
locked = 1;
}
- /* If we're forced to re-evaluate polling (via pollset_kick with
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
- a loop */
- if (worker.reevaluate_polling_on_wakeup) {
- worker.reevaluate_polling_on_wakeup = 0;
- pollset->kicked_without_pollers = 0;
- if (queued_work || worker.kicked_specifically) {
- /* If there's queued work on the list, then set the deadline to be
- immediate so we get back out of the polling loop quickly */
- deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
- }
- keep_polling = 1;
- }
}
if (added_worker) {
remove_worker(pollset, &worker);
gpr_tls_set(&g_current_thread_worker, 0);
}
- /* release wakeup fd to the local pool */
- worker.wakeup_fd->next = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker.wakeup_fd;
+
/* check shutdown conditions */
if (pollset->shutting_down) {
if (pollset_has_workers(pollset)) {
@@ -1135,10 +1081,9 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else if (fd->polling_island == NULL) {
pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
-
} else if (pollset->polling_island == NULL) {
pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
- } else { // Non null and different
+ } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
}
@@ -1182,9 +1127,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = pollset->epoll_fd;
int ep_rv;
- int poll_rv;
int timeout_ms;
- struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@@ -1196,63 +1139,35 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
- pfds[0].events = POLLIN;
- pfds[0].revents = 0;
- pfds[1].fd = epoll_fd;
- pfds[1].events = POLLIN;
- pfds[1].revents = 0;
-
- /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
- even going into the blocking annotation if possible */
- GPR_TIMER_BEGIN("poll", 0);
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- GPR_TIMER_END("poll", 0);
-
- if (poll_rv < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
- }
- } else if (poll_rv == 0) {
- /* do nothing */
- } else {
- if (pfds[0].revents) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
- }
- if (pfds[1].revents) {
- do {
- /* The following epoll_wait never blocks; it has a timeout of 0 */
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
- }
+ do {
+ /* The following epoll_wait never blocks; it has a timeout of 0 */
+ ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
+ }
+ } else {
+ int i;
+ for (i = 0; i < ep_rv; ++i) {
+ grpc_fd *fd = ep_ev[i].data.ptr;
+ /* TODO(klempner): We might want to consider making err and pri
+ * separate events */
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (fd == NULL) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
- int i;
- for (i = 0; i < ep_rv; ++i) {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (fd == NULL) {
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
- } else {
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
}
}
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+ }
}
- }
+ } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
static void multipoll_with_epoll_pollset_finish_shutdown(