aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-11-18 11:41:30 -0800
committerGravatar yang-g <yangg@google.com>2015-11-18 11:41:30 -0800
commit71b962634eceacae336b1c706829e4ad9621b397 (patch)
tree35378c669bf04c24097a403af65a5adb8c82e812 /src/core/iomgr
parent42630b010f398ca072a0ba89648855c3052c6594 (diff)
parent75b53d6a5d891fa33a9173318b1446faeaf154e4 (diff)
merge with head
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/closure.c23
-rw-r--r--src/core/iomgr/closure.h19
-rw-r--r--src/core/iomgr/exec_ctx.c5
-rw-r--r--src/core/iomgr/executor.c13
-rw-r--r--src/core/iomgr/pollset.h5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c14
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c4
-rw-r--r--src/core/iomgr/pollset_posix.c88
-rw-r--r--src/core/iomgr/pollset_posix.h9
-rw-r--r--src/core/iomgr/pollset_windows.c12
-rw-r--r--src/core/iomgr/workqueue_posix.c2
11 files changed, 108 insertions, 86 deletions
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index b4f1817de4..4aae52a454 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -39,18 +39,17 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
- closure->next = NULL;
+ closure->final_data = 0;
}
void grpc_closure_list_add(grpc_closure_list *closure_list,
grpc_closure *closure, int success) {
if (closure == NULL) return;
- closure->next = NULL;
- closure->success = success;
+ closure->final_data = (success != 0);
if (closure_list->head == NULL) {
closure_list->head = closure;
} else {
- closure_list->tail->next = closure;
+ closure_list->tail->final_data |= (gpr_uintptr)closure;
}
closure_list->tail = closure;
}
@@ -66,22 +65,12 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
if (dst->head == NULL) {
*dst = *src;
} else {
- dst->tail->next = src->head;
+ dst->tail->final_data |= (gpr_uintptr)src->head;
dst->tail = src->tail;
}
src->head = src->tail = NULL;
}
-grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) {
- grpc_closure *head;
- if (list->head == NULL) {
- return NULL;
- }
- head = list->head;
- list->head = list->head->next;
- return head;
-}
-
typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
@@ -103,3 +92,7 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper;
}
+
+grpc_closure *grpc_closure_next(grpc_closure *closure) {
+ return (grpc_closure *)(closure->final_data & ~(gpr_uintptr)1);
+}
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index 7a9f7ccad0..a1d738bf5a 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
#define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
-#include <stddef.h>
+#include <grpc/support/port_platform.h>
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -64,13 +64,10 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
- /** Internal. A boolean indication to "cb" on the state of the iomgr.
- * For instance, closures created during a shutdown would have this field set
- * to false. */
- int success;
-
- /**< Internal. Do not touch */
- struct grpc_closure *next;
+ /** Once enqueued, contains in the lower bit the success of the closure,
+ and in the upper bits the pointer to the next closure in the list.
+ Before enqueing for execution, this is usable for scratch data. */
+ gpr_uintptr final_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -91,10 +88,10 @@ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
-/** pop (return and remove) the head closure from \a list. */
-grpc_closure *grpc_closure_list_pop(grpc_closure_list *list);
-
/** return whether \a list is empty. */
int grpc_closure_list_empty(grpc_closure_list list);
+/** return the next pointer for a queued closure list */
+grpc_closure *grpc_closure_next(grpc_closure *closure);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c
index 410b34c521..e95eaf267a 100644
--- a/src/core/iomgr/exec_ctx.c
+++ b/src/core/iomgr/exec_ctx.c
@@ -44,10 +44,11 @@ int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
- grpc_closure *next = c->next;
+ int success = (int)(c->final_data & 1);
+ grpc_closure *next = (grpc_closure *)(c->final_data & ~(gpr_uintptr)1);
did_something++;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
- c->cb(exec_ctx, c->cb_arg, c->success);
+ c->cb(exec_ctx, c->cb_arg, success);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
c = next;
}
diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c
index 457e5cdbac..00c68f7828 100644
--- a/src/core/iomgr/executor.c
+++ b/src/core/iomgr/executor.c
@@ -63,8 +63,6 @@ void grpc_executor_init() {
/* thread body */
static void closure_exec_thread_func(void *ignored) {
- grpc_closure *closure;
-
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
while (1) {
gpr_mu_lock(&g_executor.mu);
@@ -72,16 +70,16 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
}
- closure = grpc_closure_list_pop(&g_executor.closures);
- if (closure == NULL) {
+ if (grpc_closure_list_empty(g_executor.closures)) {
/* no more work, time to die */
GPR_ASSERT(g_executor.busy == 1);
g_executor.busy = 0;
gpr_mu_unlock(&g_executor.mu);
break;
+ } else {
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
}
gpr_mu_unlock(&g_executor.mu);
- closure->cb(&exec_ctx, closure->cb_arg, closure->success);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -125,7 +123,6 @@ void grpc_executor_enqueue(grpc_closure *closure, int success) {
void grpc_executor_shutdown() {
int pending_join;
- grpc_closure *closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_executor.mu);
@@ -136,9 +133,7 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) {
- closure->cb(&exec_ctx, closure->cb_arg, closure->success);
- }
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index d15553a12a..c6b0214dea 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -55,8 +55,13 @@
#endif
void grpc_pollset_init(grpc_pollset *pollset);
+/* Begin shutting down the pollset, and call closure when done.
+ * GRPC_POLLSET_MU(pollset) must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
+/** Reset the pollset to its initial state (perhaps with some cached objects);
+ * must have been previously shutdown */
+void grpc_pollset_reset(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 2aafd21dfb..1f1bf47e98 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -47,21 +47,13 @@
#include "src/core/support/block_annotate.h"
#include "src/core/profiling/timers.h"
-typedef struct wakeup_fd_hdl {
- grpc_wakeup_fd wakeup_fd;
- struct wakeup_fd_hdl *next;
-} wakeup_fd_hdl;
-
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
grpc_closure closure;
} delayed_add;
-typedef struct {
- int epoll_fd;
- wakeup_fd_hdl *free_wakeup_fds;
-} pollset_hdr;
+typedef struct { int epoll_fd; } pollset_hdr;
static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
@@ -174,7 +166,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = h->epoll_fd;
@@ -197,7 +189,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
/* do nothing */
} else {
if (pfds[0].revents) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
}
if (pfds[1].revents) {
do {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index faa6c14491..09f04b64b9 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -124,7 +124,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
- pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
pfds[1].events = POLLIN;
pfds[1].revents = 0;
for (i = 0; i < h->fd_count; i++) {
@@ -174,7 +174,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfds[1].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index a8e620ea1e..c179248214 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -111,7 +111,7 @@ void grpc_pollset_kick_ext(grpc_pollset *p,
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
}
p->kicked_without_pollers = 1;
GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0);
@@ -122,14 +122,14 @@ void grpc_pollset_kick_ext(grpc_pollset *p,
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1;
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
}
} else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
@@ -151,7 +151,7 @@ void grpc_pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, specific_worker);
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
}
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
@@ -177,9 +177,9 @@ void grpc_pollset_global_init(void) {
void grpc_pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
- grpc_wakeup_fd_global_destroy();
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
+ grpc_wakeup_fd_global_destroy();
}
void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
@@ -196,6 +196,34 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ become_basic_pollset(pollset, NULL);
+}
+
+void grpc_pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ pollset->vtable->destroy(pollset);
+ gpr_mu_destroy(&pollset->mu);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+}
+
+void grpc_pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ pollset->vtable->destroy(pollset);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
become_basic_pollset(pollset, NULL);
}
@@ -245,13 +273,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
worker->reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker->wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker->wakeup_fd->next;
+ } else {
+ worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd));
+ grpc_wakeup_fd_init(&worker->wakeup_fd->fd);
+ }
worker->kicked_specifically = 0;
- /* TODO(ctiller): pool these */
- grpc_wakeup_fd_init(&worker->wakeup_fd);
/* If there's work waiting for the pollset to be idle, and the
pollset is idle, then do that work */
if (!grpc_pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0);
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
goto done;
}
@@ -260,16 +294,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
May update deadline to ensure timely wakeups.
TODO(ctiller): can this work be localized? */
if (grpc_timer_check(exec_ctx, now, &deadline)) {
+ GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0);
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
/* If we're shutting down then we don't execute any extended work */
if (pollset->shutting_down) {
+ GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0);
goto done;
}
/* Give do_promote priority so we don't starve it out */
if (pollset->in_flight_cbs) {
+ GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0);
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
@@ -294,6 +331,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
} else {
+ GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0);
pollset->kicked_without_pollers = 0;
}
/* Finished execution - start cleaning up.
@@ -324,7 +362,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
remove_worker(pollset, worker);
gpr_tls_set(&g_current_thread_worker, 0);
}
- grpc_wakeup_fd_destroy(&worker->wakeup_fd);
+ /* release wakeup fd to the local pool */
+ worker->wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker->wakeup_fd;
+ /* check shutdown conditions */
if (pollset->shutting_down) {
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, NULL);
@@ -339,8 +380,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
}
@@ -350,35 +391,20 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
- int call_shutdown = 0;
- gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
- if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
- !grpc_pollset_has_workers(pollset)) {
- pollset->called_shutdown = 1;
- call_shutdown = 1;
- }
+ pollset->shutdown_done = closure;
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!grpc_pollset_has_workers(pollset)) {
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
}
- pollset->shutdown_done = closure;
- grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- gpr_mu_unlock(&pollset->mu);
-
- if (call_shutdown) {
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ !grpc_pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
finish_shutdown(exec_ctx, pollset);
}
}
-void grpc_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->shutting_down);
- GPR_ASSERT(pollset->in_flight_cbs == 0);
- GPR_ASSERT(!grpc_pollset_has_workers(pollset));
- pollset->vtable->destroy(pollset);
- gpr_mu_destroy(&pollset->mu);
-}
-
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
@@ -558,7 +584,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
- pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
nfds = 2;
@@ -600,7 +626,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfd[1].revents & POLLIN_CHECK) {
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
+ grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
}
if (nfds > 2) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 95ebeab1c2..e4593728bd 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -48,8 +48,13 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable;
use the struct tag */
struct grpc_fd;
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
typedef struct grpc_pollset_worker {
- grpc_wakeup_fd wakeup_fd;
+ grpc_cached_wakeup_fd *wakeup_fd;
int reevaluate_polling_on_wakeup;
int kicked_specifically;
struct grpc_pollset_worker *next;
@@ -74,6 +79,8 @@ typedef struct grpc_pollset {
int fd;
void *ptr;
} data;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
} grpc_pollset;
struct grpc_pollset_vtable {
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 9f74580273..c3f310ee27 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -35,6 +35,7 @@
#ifdef GPR_WINSOCK_SOCKET
+#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/timer_internal.h"
@@ -112,7 +113,6 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
- gpr_mu_lock(&grpc_polling_mu);
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
@@ -120,11 +120,19 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
} else {
pollset->on_shutdown = closure;
}
- gpr_mu_unlock(&grpc_polling_mu);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {}
+void grpc_pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET));
+ pollset->shutting_down = 0;
+ pollset->is_iocp_worker = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->on_shutdown = NULL;
+}
+
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
gpr_timespec deadline) {
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index 0a0f3c364e..c087b887b8 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -129,8 +129,6 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) {
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success) {
- closure->success = success;
- closure->next = NULL;
gpr_mu_lock(&workqueue->mu);
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);