aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/closure.c18
-rw-r--r--src/core/lib/iomgr/closure.h11
-rw-r--r--src/core/lib/iomgr/combiner.c372
-rw-r--r--src/core/lib/iomgr/combiner.h15
-rw-r--r--src/core/lib/iomgr/error.c18
-rw-r--r--src/core/lib/iomgr/error.h12
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c231
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c30
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c280
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h1
-rw-r--r--src/core/lib/iomgr/ev_posix.c24
-rw-r--r--src/core/lib/iomgr/ev_posix.h13
-rw-r--r--src/core/lib/iomgr/exec_ctx.c61
-rw-r--r--src/core/lib/iomgr/exec_ctx.h24
-rw-r--r--src/core/lib/iomgr/iomgr.c11
-rw-r--r--src/core/lib/iomgr/tcp_posix.c11
-rw-r--r--src/core/lib/iomgr/timer.h17
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.c118
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h (renamed from src/core/lib/iomgr/workqueue_posix.h)63
-rw-r--r--src/core/lib/iomgr/wakeup_fd_pipe.c12
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.c33
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h5
-rw-r--r--src/core/lib/iomgr/workqueue.h16
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c196
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c10
25 files changed, 1098 insertions, 504 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 1ba0a5c141..c6ddc76732 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -35,6 +35,8 @@
#include <grpc/support/alloc.h>
+#include "src/core/lib/profiling/timers.h"
+
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
@@ -51,7 +53,7 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
GRPC_ERROR_UNREF(error);
return;
}
- closure->error = error;
+ closure->error_data.error = error;
closure->next_data.next = NULL;
if (closure_list->head == NULL) {
closure_list->head = closure;
@@ -64,8 +66,8 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
void grpc_closure_list_fail_all(grpc_closure_list *list,
grpc_error *forced_failure) {
for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) {
- if (c->error == GRPC_ERROR_NONE) {
- c->error = GRPC_ERROR_REF(forced_failure);
+ if (c->error_data.error == GRPC_ERROR_NONE) {
+ c->error_data.error = GRPC_ERROR_REF(forced_failure);
}
}
GRPC_ERROR_UNREF(forced_failure);
@@ -110,3 +112,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper;
}
+
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_closure_run", 0);
+ if (c != NULL) {
+ c->cb(exec_ctx, c->cb_arg, error);
+ }
+ GRPC_ERROR_UNREF(error);
+ GPR_TIMER_END("grpc_closure_run", 0);
+}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index c1a22b6021..2b4b271eaa 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -76,7 +76,10 @@ struct grpc_closure {
void *cb_arg;
/** Once queued, the result of the closure. Before then: scratch space */
- grpc_error *error;
+ union {
+ grpc_error *error;
+ uintptr_t scratch;
+ } error_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -106,4 +109,10 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
bool grpc_closure_list_empty(grpc_closure_list list);
+/** Run a closure directly. Caller ensures that no locks are being held above.
+ * Note that calling this at the end of a closure callback function itself is
+ * by definition safe. */
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 831bdb4aff..60ee14eb23 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -50,25 +50,57 @@ int grpc_combiner_trace = 0;
} \
} while (0)
+#define STATE_UNORPHANED 1
+#define STATE_ELEM_COUNT_LOW_BIT 2
+
struct grpc_combiner {
+ grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
// state is:
- // lower bit - zero if orphaned
- // other bits - number of items queued on the lock
+ // lower bit - zero if orphaned (STATE_UNORPHANED)
+ // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
- bool take_async_break_before_final_list;
+ // number of elements in the list that are covered by a poller: if >0, we can
+ // offload safely
+ gpr_atm elements_covered_by_poller;
+ bool time_to_execute_final_list;
+ bool final_list_covered_by_poller;
grpc_closure_list final_list;
- grpc_closure continue_finishing;
+ grpc_closure offload;
};
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+
+typedef struct {
+ grpc_error *error;
+ bool covered_by_poller;
+} error_data;
+
+static uintptr_t pack_error_data(error_data d) {
+ return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
+}
+
+static error_data unpack_error_data(uintptr_t p) {
+ return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
+}
+
+static bool is_covered_by_poller(grpc_combiner *lock) {
+ return lock->final_list_covered_by_poller ||
+ gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
+}
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ lock->next_combiner_on_this_exec_ctx = NULL;
+ lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
- gpr_atm_no_barrier_store(&lock->state, 1);
+ lock->final_list_covered_by_poller = false;
+ gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
+ gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
- lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list);
+ grpc_closure_init(&lock->offload, offload, lock);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -82,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1);
+ gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
if (old_state == 1) {
@@ -90,170 +122,186 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock) {
+ lock->next_combiner_on_this_exec_ctx = NULL;
+ if (exec_ctx->active_combiner == NULL) {
+ exec_ctx->active_combiner = exec_ctx->last_combiner = lock;
+ } else {
+ exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock;
+ exec_ctx->last_combiner = lock;
+ }
+}
-static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
+static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock) {
+ lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner;
exec_ctx->active_combiner = lock;
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_mainline", 0);
+ if (lock->next_combiner_on_this_exec_ctx == NULL) {
+ exec_ctx->last_combiner = lock;
+ }
}
-static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.execute_final", 0);
- grpc_closure *c = lock->final_list.head;
- GPR_ASSERT(c != NULL);
- grpc_closure_list_init(&lock->final_list);
- lock->take_async_break_before_final_list = false;
- int loops = 0;
- while (c != NULL) {
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- loops++;
+void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
+ GPR_TIMER_BEGIN("combiner.execute", 0);
+ gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
+ GRPC_COMBINER_TRACE(gpr_log(
+ GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
+ cl, covered_by_poller, last));
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
+ cl->error_data.scratch =
+ pack_error_data((error_data){error, covered_by_poller});
+ if (covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
+ }
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ if (last == 1) {
+ // first element on this list: add it to the list of combiner locks
+ // executing within this exec_ctx
+ push_last_on_exec_ctx(exec_ctx, lock);
}
- GPR_TIMER_END("combiner.execute_final", 0);
+ GPR_TIMER_END("combiner.execute", 0);
}
-static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- exec_ctx->active_combiner = lock;
- // quick peek to see if new things have turned up on the queue: if so, go back
- // to executing them before the final list
- if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- } else {
- execute_final(exec_ctx, lock);
- finish(exec_ctx, lock);
+static void move_next(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->active_combiner =
+ exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
+ if (exec_ctx->active_combiner == NULL) {
+ exec_ctx->last_combiner = NULL;
}
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_final", 0);
}
-static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.start_execute_final", 0);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG,
- "C:%p start_execute_final take_async_break_before_final_list=%d",
- lock, lock->take_async_break_before_final_list));
- if (lock->take_async_break_before_final_list) {
- grpc_closure_init(&lock->continue_finishing, continue_executing_final,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.start_execute_final", 0);
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_combiner *lock = arg;
+ push_last_on_exec_ctx(exec_ctx, lock);
+}
+
+static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ move_next(exec_ctx);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
+ lock->optional_workqueue));
+ grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
+ GRPC_ERROR_NONE);
+}
+
+bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
+ GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0);
+ grpc_combiner *lock = exec_ctx->active_combiner;
+ if (lock == NULL) {
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return false;
- } else {
- execute_final(exec_ctx, lock);
- GPR_TIMER_END("combiner.start_execute_final", 0);
- return true;
}
-}
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
- gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- if (n == NULL) {
- // Queue is in an transiently inconsistent state: a new item is being queued
- // but is not visible to this thread yet.
- // Use this as a cue that we should go off and do something else for a while
- // (and come back later)
- grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return false;
+ gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
+ "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+ "time_to_execute_final_list=%d",
+ lock, lock->optional_workqueue, is_covered_by_poller(lock),
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
+ lock->time_to_execute_final_list));
+
+ if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
+ grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
+ // this execution context wants to move on, and we have a workqueue (and
+ // so can help the execution context out): schedule remaining work to be
+ // picked up on the workqueue
+ queue_offload(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *error = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return true;
-}
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock);
- GPR_TIMER_BEGIN("combiner.finish", 0);
- int loops = 0;
- do {
- executor = maybe_finish_one;
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
- "C:%p finish[%d] old_state=%" PRIdPTR, lock,
- loops, old_state));
- switch (old_state) {
- default:
- // we have multiple queued work items: just continue executing them
- break;
- case 5: // we're down to one queued item: if it's the final list we
- case 4: // should do that
- if (!grpc_closure_list_empty(lock->final_list)) {
- executor = start_execute_final;
- }
- break;
- case 3: // had one count, one unorphaned --> unlocked unorphaned
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 2: // and one count, one orphaned --> unlocked and orphaned
- really_destroy(exec_ctx, lock);
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 1:
- case 0:
- // these values are illegal - representing an already unlocked or
- // deleted lock
- GPR_UNREACHABLE_CODE(return );
+ if (!lock->time_to_execute_final_list ||
+ // peek to see if something new has shown up, and execute that with
+ // priority
+ (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
+ GRPC_COMBINER_TRACE(
+ gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
+ if (n == NULL) {
+ // queue is in an inconsistent state: use this as a cue that we should
+ // go off and do something else for a while (and come back later)
+ GPR_TIMER_MARK("delay_busy", 0);
+ if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
+ queue_offload(exec_ctx, lock);
+ }
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
- loops++;
- } while (executor(exec_ctx, lock));
- GPR_TIMER_END("combiner.finish", 0);
-}
+ GPR_TIMER_BEGIN("combiner.exec1", 0);
+ grpc_closure *cl = (grpc_closure *)n;
+ error_data err = unpack_error_data(cl->error_data.scratch);
+ cl->cb(exec_ctx, cl->cb_arg, err.error);
+ if (err.covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1);
+ }
+ GRPC_ERROR_UNREF(err.error);
+ GPR_TIMER_END("combiner.exec1", 0);
+ } else {
+ grpc_closure *c = lock->final_list.head;
+ GPR_ASSERT(c != NULL);
+ grpc_closure_list_init(&lock->final_list);
+ lock->final_list_covered_by_poller = false;
+ int loops = 0;
+ while (c != NULL) {
+ GPR_TIMER_BEGIN("combiner.exec_1final", 0);
+ GRPC_COMBINER_TRACE(
+ gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ GPR_TIMER_END("combiner.exec_1final", 0);
+ }
+ }
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error) {
+ GPR_TIMER_MARK("unref", 0);
+ move_next(exec_ctx);
+ lock->time_to_execute_final_list = false;
+ gpr_atm old_state =
+ gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
- GPR_TIMER_BEGIN("combiner.execute", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
- GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- if (last == 1) {
- exec_ctx->active_combiner = lock;
- GPR_TIMER_BEGIN("combiner.execute_first_cb", 0);
- cl->cb(exec_ctx, cl->cb_arg, error);
- GPR_TIMER_END("combiner.execute_first_cb", 0);
- GRPC_ERROR_UNREF(error);
- finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- } else {
- cl->error = error;
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
+// Define a macro to ease readability of the following switch statement.
+#define OLD_STATE_WAS(orphaned, elem_count) \
+ (((orphaned) ? 0 : STATE_UNORPHANED) | \
+ ((elem_count)*STATE_ELEM_COUNT_LOW_BIT))
+ // Depending on what the previous state was, we need to perform different
+ // actions.
+ switch (old_state) {
+ default:
+ // we have multiple queued work items: just continue executing them
+ break;
+ case OLD_STATE_WAS(false, 2):
+ case OLD_STATE_WAS(true, 2):
+ // we're down to one queued item: if it's the final list we should do that
+ if (!grpc_closure_list_empty(lock->final_list)) {
+ lock->time_to_execute_final_list = true;
+ }
+ break;
+ case OLD_STATE_WAS(false, 1):
+ // had one count, one unorphaned --> unlocked unorphaned
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
+ case OLD_STATE_WAS(true, 1):
+ // and one count, one orphaned --> unlocked and orphaned
+ really_destroy(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
+ case OLD_STATE_WAS(false, 0):
+ case OLD_STATE_WAS(true, 0):
+ // these values are illegal - representing an already unlocked or
+ // deleted lock
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ GPR_UNREACHABLE_CODE(return true);
}
- GPR_TIMER_END("combiner.execute", 0);
+ push_first_on_exec_ctx(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
@@ -264,30 +312,26 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
- bool force_async_break) {
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG,
- "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
- lock, closure, force_async_break, exec_ctx->active_combiner));
+ GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
+ closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error);
+ grpc_closure_create(enqueue_finally, closure), error,
+ false);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
- if (force_async_break) {
- lock->take_async_break_before_final_list = true;
- }
if (grpc_closure_list_empty(lock->final_list)) {
- gpr_atm_full_fetch_add(&lock->state, 2);
+ gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
+ }
+ if (covered_by_poller) {
+ lock->final_list_covered_by_poller = true;
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
-
-void grpc_combiner_force_async_finally(grpc_combiner *lock) {
- lock->take_async_break_before_final_list = true;
-}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 1409db24b9..d04eeed83a 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -52,19 +52,14 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Execute \a action within the lock.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error);
+ grpc_closure *closure, grpc_error *error,
+ bool covered_by_poller);
// Execute \a action within the lock just prior to unlocking.
-// if \a hint_async_break is true, the combiner tries to hand execution to
-// another thread before finishing the primary queue of combined closures and
-// executing the finally list.
-// Deprecation warning: \a hint_async_break will be removed in a future version
-// Takes a very slow and round-about path if not called from a
-// grpc_combiner_execute closure.
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
- bool hint_async_break);
-// Deprecated: force the finally list execution onto another thread
-void grpc_combiner_force_async_finally(grpc_combiner *lock);
+ bool covered_by_poller);
+
+bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
extern int grpc_combiner_trace;
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 31c80260f8..f6bb3a0477 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "http_status";
case GRPC_ERROR_INT_LIMIT:
return "limit";
+ case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
+ return "occurred_during_write";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "tsi_error";
case GRPC_ERROR_STR_FILENAME:
return "filename";
+ case GRPC_ERROR_STR_QUEUED_BUFFERS:
+ return "queued_buffers";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -265,7 +269,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
} else {
out = gpr_malloc(sizeof(*out));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "%p create copying", out);
+ gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs);
@@ -523,21 +527,25 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
+static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
+ bool *first) {
if (n == NULL) return;
- add_errs(n->left, s, sz, cap);
+ add_errs(n->left, s, sz, cap, first);
+ if (!*first) append_chr(',', s, sz, cap);
+ *first = false;
const char *e = grpc_error_string(n->value);
append_str(e, s, sz, cap);
grpc_error_free_string(e);
- add_errs(n->right, s, sz, cap);
+ add_errs(n->right, s, sz, cap, first);
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
+ bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap);
+ add_errs(err->errs.root, &s, &sz, &cap, &first);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 00ace8a7a9..f3f3b80a09 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -100,6 +100,8 @@ typedef enum {
GRPC_ERROR_INT_HTTP_STATUS,
/// context sensitive limit associated with the error
GRPC_ERROR_INT_LIMIT,
+ /// chttp2: did the error occur while a write was in progress
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
} grpc_error_ints;
typedef enum {
@@ -121,6 +123,8 @@ typedef enum {
GRPC_ERROR_STR_TSI_ERROR,
/// filename that we were trying to read/write when this error occurred
GRPC_ERROR_STR_FILENAME,
+ /// which data was queued for writing when the error occurred
+ GRPC_ERROR_STR_QUEUED_BUFFERS
} grpc_error_strs;
typedef enum {
@@ -128,9 +132,13 @@ typedef enum {
GRPC_ERROR_TIME_CREATED,
} grpc_error_times;
+/// The following "special" errors can be propagated without allocating memory.
+/// They are always even so that other code (particularly combiner locks) can
+/// safely use the lower bit for themselves.
+
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
-#define GRPC_ERROR_OOM ((grpc_error *)1)
-#define GRPC_ERROR_CANCELLED ((grpc_error *)2)
+#define GRPC_ERROR_OOM ((grpc_error *)2)
+#define GRPC_ERROR_CANCELLED ((grpc_error *)4)
const char *grpc_error_string(grpc_error *error);
void grpc_error_free_string(const char *str);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index ab77ebc78b..e5909d9380 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -152,20 +152,20 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-//#define GRPC_PI_REF_COUNT_DEBUG
-#ifdef GRPC_PI_REF_COUNT_DEBUG
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
+#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
+/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
@@ -185,8 +185,17 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
- /* The workqueue associated with this polling island */
- grpc_workqueue *workqueue;
+ /* Number of threads currently polling on this island */
+ gpr_atm poller_count;
+ /* Mutex guarding the read end of the workqueue (must be held to pop from
+ * workqueue_items) */
+ gpr_mu workqueue_read_mu;
+ /* Queue of closures to be executed */
+ gpr_mpscq workqueue_items;
+ /* Count of items in workqueue_items */
+ gpr_atm workqueue_item_count;
+ /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
+ grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -275,6 +284,10 @@ static bool append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
+/* The polling island being polled right now.
+ See comments in workqueue_maybe_wakeup for why this is tracked. */
+static __thread polling_island *g_current_thread_polling_island;
+
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -289,12 +302,12 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-#ifdef GRPC_PI_REF_COUNT_DEBUG
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
- int line) {
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static void pi_add_ref_dbg(polling_island *pi, const char *reason,
+ const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
@@ -302,12 +315,42 @@ static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
}
static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
- char *reason, char *file, int line) {
+ const char *reason, const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
+
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ if (workqueue != NULL) {
+ pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
+ }
+ return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {
+ if (workqueue != NULL) {
+ pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
+ }
+}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ if (workqueue != NULL) {
+ pi_add_ref((polling_island *)workqueue);
+ }
+ return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {
+ if (workqueue != NULL) {
+ pi_unref(exec_ctx, (polling_island *)workqueue);
+ }
+}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -315,10 +358,7 @@ static void pi_add_ref(polling_island *pi) {
}
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
- /* If ref count went to one, we're back to just the workqueue owning a ref.
- Unref the workqueue to break the loop.
-
- If ref count went to zero, delete the polling island.
+ /* If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
polling island (and that there is no racing pi_add_ref() call either).
@@ -326,20 +366,12 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
- switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
- case 2: /* last external ref: the only one now owned is by the workqueue */
- GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
- break;
- case 1: {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(exec_ctx, pi);
- if (next != NULL) {
- PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
- }
- break;
+ if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+ polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+ polling_island_delete(exec_ctx, pi);
+ if (next != NULL) {
+ PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
}
- case 0:
- GPR_UNREACHABLE_CODE(return );
}
}
@@ -488,11 +520,20 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- pi->workqueue = NULL;
+
+ gpr_mu_init(&pi->workqueue_read_mu);
+ gpr_mpscq_init(&pi->workqueue_items);
+ gpr_atm_rel_store(&pi->workqueue_item_count, 0);
gpr_atm_rel_store(&pi->ref_count, 0);
+ gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
+ if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
+ err_desc)) {
+ goto done;
+ }
+
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -501,26 +542,14 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
}
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+ polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
- if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
- err_desc) &&
- *error == GRPC_ERROR_NONE) {
- polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
- error);
- GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
- pi->workqueue->wakeup_read_fd->polling_island = pi;
- PI_ADD_REF(pi, "fd");
- }
-
done:
if (*error != GRPC_ERROR_NONE) {
- if (pi->workqueue != NULL) {
- GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
- }
polling_island_delete(exec_ctx, pi);
pi = NULL;
}
@@ -533,7 +562,11 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
+ GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
+ gpr_mu_destroy(&pi->workqueue_read_mu);
+ gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
+ grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_free(pi->fds);
gpr_free(pi);
}
@@ -678,6 +711,45 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
+static void workqueue_maybe_wakeup(polling_island *pi) {
+ /* If this thread is the current poller, then it may be that it's about to
+ decrement the current poller count, so we need to look past this thread */
+ bool is_current_poller = (g_current_thread_polling_island == pi);
+ gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
+ gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+ /* Only issue a wakeup if it's likely that some poller could come in and take
+ it right now. Note that since we do an anticipatory mpscq_pop every poll
+ loop, it's ok if we miss the wakeup here, as we'll get the work item when
+ the next poller enters anyway. */
+ if (current_pollers > min_current_pollers_for_wakeup) {
+ GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
+ grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
+ }
+}
+
+static void workqueue_move_items_to_parent(polling_island *q) {
+ polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
+ if (p == NULL) {
+ return;
+ }
+ gpr_mu_lock(&q->workqueue_read_mu);
+ int num_added = 0;
+ while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
+ if (n != NULL) {
+ gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
+ gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
+ gpr_mpscq_push(&p->workqueue_items, n);
+ num_added++;
+ }
+ }
+ gpr_mu_unlock(&q->workqueue_read_mu);
+ if (num_added > 0) {
+ workqueue_maybe_wakeup(p);
+ }
+ workqueue_move_items_to_parent(p);
+}
+
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -702,6 +774,8 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
+
+ workqueue_move_items_to_parent(q);
}
/* else if p == q, nothing needs to be done */
@@ -712,6 +786,26 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ /* take a ref to the workqueue: otherwise it can happen that whatever events
+ * this kicks off ends up destroying the workqueue before this function
+ * completes */
+ GRPC_WORKQUEUE_REF(workqueue, "enqueue");
+ polling_island *pi = (polling_island *)workqueue;
+ gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
+ closure->error_data.error = error;
+ gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
+ if (last == 0) {
+ workqueue_maybe_wakeup(pi);
+ }
+ workqueue_move_items_to_parent(pi);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
+ GPR_TIMER_END("workqueue.enqueue", 0);
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1042,11 +1136,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- grpc_workqueue *workqueue = NULL;
- if (fd->polling_island != NULL) {
- workqueue =
- GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
- }
+ grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
+ (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
gpr_mu_unlock(&fd->mu);
return workqueue;
}
@@ -1299,7 +1390,29 @@ static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->polling_island == NULL);
}
-#define GRPC_EPOLL_MAX_EVENTS 1000
+static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
+ polling_island *pi) {
+ if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
+ gpr_mu_unlock(&pi->workqueue_read_mu);
+ if (n != NULL) {
+ if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
+ workqueue_maybe_wakeup(pi);
+ }
+ grpc_closure *c = (grpc_closure *)n;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ return true;
+ } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
+ /* n == NULL might mean there's work but it's not available to be popped
+ * yet - try to ensure another workqueue wakes up to check shortly if so
+ */
+ workqueue_maybe_wakeup(pi);
+ }
+ }
+ return false;
+}
+
+#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
@@ -1354,7 +1467,13 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->mu);
- do {
+ /* If we get some workqueue work to do, it might end up completing an item on
+ the completion queue, so there's no need to poll... so we skip that and
+ redo the complete loop to verify */
+ if (!maybe_do_workqueue_work(exec_ctx, pi)) {
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+ g_current_thread_polling_island = pi;
+
GRPC_SCHEDULING_START_BLOCKING_REGION;
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
sig_mask);
@@ -1386,6 +1505,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
append_error(error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
err_desc);
+ } else if (data_ptr == &pi->workqueue_wakeup_fd) {
+ append_error(error,
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ err_desc);
+ maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
@@ -1408,7 +1532,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
}
}
}
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+
+ g_current_thread_polling_island = NULL;
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+ }
GPR_ASSERT(pi != NULL);
@@ -1868,6 +1995,10 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
@@ -1892,6 +2023,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
return NULL;
}
+ if (!grpc_has_wakeup_fd()) {
+ return NULL;
+ }
+
if (!is_epoll_available()) {
return NULL;
}
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index c2107e5e39..1829440a6e 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1989,6 +1989,32 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
* event engine binding
*/
@@ -2029,6 +2055,10 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 16a5e3083e..27e966c18c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -47,10 +47,12 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/thd.h>
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -246,6 +248,28 @@ struct grpc_pollset_set {
};
/*******************************************************************************
+ * condition variable polling definitions
+ */
+
+#define CV_POLL_PERIOD_MS 1000
+#define CV_DEFAULT_TABLE_SIZE 16
+
+typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
+
+typedef struct poll_args {
+ gpr_refcount refcount;
+ gpr_cv *cv;
+ struct pollfd *fds;
+ nfds_t nfds;
+ int timeout;
+ int retval;
+ int err;
+ gpr_atm status;
+} poll_args;
+
+cv_fd_table g_cvfds;
+
+/*******************************************************************************
* fd_posix.c
*/
@@ -961,8 +985,15 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (errno != EINTR) {
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
+
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ } else {
+ // Wake up all the file descriptors, if we have an invalid one
+ // we can identify it on the next pollset_work()
+ fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
+ }
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
@@ -1236,10 +1267,237 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
+ * Condition Variable polling extensions
+ */
+
+static void decref_poll_args(poll_args *args) {
+ if (gpr_unref(&args->refcount)) {
+ gpr_free(args->fds);
+ gpr_cv_destroy(args->cv);
+ gpr_free(args->cv);
+ gpr_free(args);
+ }
+}
+
+// Poll in a background thread
+static void run_poll(void *arg) {
+ int timeout, retval;
+ poll_args *pargs = (poll_args *)arg;
+ while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
+ if (pargs->timeout < 0) {
+ timeout = CV_POLL_PERIOD_MS;
+ } else {
+ timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
+ pargs->timeout -= timeout;
+ }
+ retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
+ if (retval != 0 || pargs->timeout == 0) {
+ pargs->retval = retval;
+ pargs->err = errno;
+ break;
+ }
+ }
+ gpr_mu_lock(&g_cvfds.mu);
+ if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
+ // Signal main thread that the poll completed
+ gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
+ gpr_cv_signal(pargs->cv);
+ }
+ decref_poll_args(pargs);
+ g_cvfds.pollcount--;
+ if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
+ gpr_cv_signal(&g_cvfds.shutdown_complete);
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+// This function overrides poll() to handle condition variable wakeup fds
+static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
+ unsigned int i;
+ int res, idx;
+ gpr_cv *pollcv;
+ cv_node *cvn, *prev;
+ nfds_t nsockfds = 0;
+ gpr_thd_id t_id;
+ gpr_thd_options opt;
+ poll_args *pargs = NULL;
+ gpr_mu_lock(&g_cvfds.mu);
+ pollcv = gpr_malloc(sizeof(gpr_cv));
+ gpr_cv_init(pollcv);
+ for (i = 0; i < nfds; i++) {
+ fds[i].revents = 0;
+ if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
+ idx = FD_TO_IDX(fds[i].fd);
+ cvn = gpr_malloc(sizeof(cv_node));
+ cvn->cv = pollcv;
+ cvn->next = g_cvfds.cvfds[idx].cvs;
+ g_cvfds.cvfds[idx].cvs = cvn;
+ // We should return immediately if there are pending events,
+ // but we still need to call poll() to check for socket events
+ if (g_cvfds.cvfds[idx].is_set) {
+ timeout = 0;
+ }
+ } else if (fds[i].fd >= 0) {
+ nsockfds++;
+ }
+ }
+
+ if (nsockfds > 0) {
+ pargs = gpr_malloc(sizeof(struct poll_args));
+ // Both the main thread and calling thread get a reference
+ gpr_ref_init(&pargs->refcount, 2);
+ pargs->cv = pollcv;
+ pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
+ pargs->nfds = nsockfds;
+ pargs->timeout = timeout;
+ pargs->retval = 0;
+ pargs->err = 0;
+ gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
+ idx = 0;
+ for (i = 0; i < nfds; i++) {
+ if (fds[i].fd >= 0) {
+ pargs->fds[idx].fd = fds[i].fd;
+ pargs->fds[idx].events = fds[i].events;
+ pargs->fds[idx].revents = 0;
+ idx++;
+ }
+ }
+ g_cvfds.pollcount++;
+ opt = gpr_thd_options_default();
+ gpr_thd_options_set_detached(&opt);
+ gpr_thd_new(&t_id, &run_poll, pargs, &opt);
+ // We want the poll() thread to trigger the deadline, so wait forever here
+ gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
+ res = pargs->retval;
+ errno = pargs->err;
+ } else {
+ res = 0;
+ errno = 0;
+ gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
+ }
+ } else {
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ deadline =
+ gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
+ res = 0;
+ }
+
+ idx = 0;
+ for (i = 0; i < nfds; i++) {
+ if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
+ cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
+ prev = NULL;
+ while (cvn->cv != pollcv) {
+ prev = cvn;
+ cvn = cvn->next;
+ GPR_ASSERT(cvn);
+ }
+ if (!prev) {
+ g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
+ } else {
+ prev->next = cvn->next;
+ }
+ gpr_free(cvn);
+
+ if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
+ fds[i].revents = POLLIN;
+ if (res >= 0) res++;
+ }
+ } else if (fds[i].fd >= 0 &&
+ gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
+ fds[i].revents = pargs->fds[idx].revents;
+ idx++;
+ }
+ }
+
+ if (pargs) {
+ decref_poll_args(pargs);
+ } else {
+ gpr_cv_destroy(pollcv);
+ gpr_free(pollcv);
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+
+ return res;
+}
+
+static void global_cv_fd_table_init() {
+ gpr_mu_init(&g_cvfds.mu);
+ gpr_mu_lock(&g_cvfds.mu);
+ gpr_cv_init(&g_cvfds.shutdown_complete);
+ g_cvfds.shutdown = 0;
+ g_cvfds.pollcount = 0;
+ g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
+ g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
+ g_cvfds.free_fds = NULL;
+ for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
+ g_cvfds.cvfds[i].is_set = 0;
+ g_cvfds.cvfds[i].cvs = NULL;
+ g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[i];
+ }
+ // Override the poll function with one that supports cvfds
+ g_cvfds.poll = grpc_poll_function;
+ grpc_poll_function = &cvfd_poll;
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+static void global_cv_fd_table_shutdown() {
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.shutdown = 1;
+ // Attempt to wait for all abandoned poll() threads to terminate
+ // Not doing so will result in reported memory leaks
+ if (g_cvfds.pollcount > 0) {
+ int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(3, GPR_TIMESPAN)));
+ GPR_ASSERT(res == 0);
+ }
+ gpr_cv_destroy(&g_cvfds.shutdown_complete);
+ grpc_poll_function = g_cvfds.poll;
+ gpr_free(g_cvfds.cvfds);
+ gpr_mu_unlock(&g_cvfds.mu);
+ gpr_mu_destroy(&g_cvfds.mu);
+}
+
+/*******************************************************************************
* event engine binding
*/
-static void shutdown_engine(void) { pollset_global_shutdown(); }
+static void shutdown_engine(void) {
+ pollset_global_shutdown();
+ if (grpc_cv_wakeup_fds_enabled()) {
+ global_cv_fd_table_shutdown();
+ }
+}
static const grpc_event_engine_vtable vtable = {
.pollset_size = sizeof(grpc_pollset),
@@ -1273,11 +1531,29 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ if (!grpc_has_wakeup_fd()) {
+ return NULL;
+ }
+ if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ return NULL;
+ }
+ return &vtable;
+}
+
+const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void) {
+ global_cv_fd_table_init();
+ grpc_enable_cv_wakeup_fds(1);
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ global_cv_fd_table_shutdown();
+ grpc_enable_cv_wakeup_fds(0);
return NULL;
}
return &vtable;
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
index 291736a2db..202ffca14c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.h
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -37,5 +37,6 @@
#include "src/core/lib/iomgr/ev_posix.h"
const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+const grpc_event_engine_vtable *grpc_init_poll_cv_posix(void);
#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 6536672685..9857b0bce9 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -66,6 +66,7 @@ typedef struct {
static const event_engine_factory g_factories[] = {
{"epoll", grpc_init_epoll_linux},
{"poll", grpc_init_poll_posix},
+ {"poll-cv", grpc_init_poll_cv_posix},
{"legacy", grpc_init_poll_and_epoll_posix},
};
@@ -258,4 +259,27 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); }
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason) {
+ return g_event_engine->workqueue_ref(workqueue, file, line, reason);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {
+ g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason);
+}
+#else
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ return g_event_engine->workqueue_ref(workqueue);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+ g_event_engine->workqueue_unref(exec_ctx, workqueue);
+}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+}
+
#endif // GPR_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index c2aa1756ea..2fdef06838 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -40,6 +40,7 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/iomgr/workqueue.h"
typedef struct grpc_fd grpc_fd;
@@ -95,6 +96,18 @@ typedef struct grpc_event_engine_vtable {
grpc_error *(*kick_poller)(void);
void (*shutdown_engine)(void);
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+ grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason);
+ void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason);
+#else
+ grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
+ void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
+#endif
+ void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index ac7785ec13..604713e578 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@@ -60,18 +61,43 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- did_something = true;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ did_something = true;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ break;
+ }
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ if (exec_ctx->stealing_from_workqueue != NULL) {
+ if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ } else {
+ grpc_closure *c = exec_ctx->stolen_closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ grpc_error *error = c->error_data.error;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
- c = next;
+ GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
+ grpc_exec_ctx_flush(exec_ctx);
+ did_something = true;
}
}
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
@@ -86,12 +112,25 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
+ GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
if (offload_target_or_null == NULL) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else {
+ } else if (exec_ctx->stealing_from_workqueue == NULL) {
+ exec_ctx->stealing_from_workqueue = offload_target_or_null;
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ } else { /* stealing_from_workqueue == offload_target_or_null */
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
}
+ GPR_TIMER_END("grpc_exec_ctx_sched", 0);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 4d20ecf922..7e50cb9825 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,15 +66,33 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ /** The workqueue we're stealing work from.
+ As items are queued to the execution context, we try to steal one
+ workqueue item and execute it inline (assuming the exec_ctx is not
+ finished) - doing so does not invalidate the workqueue's contract, and
+ provides a small latency win in cases where we get a hit */
+ grpc_workqueue *stealing_from_workqueue;
+ /** The workqueue item that was stolen from the workqueue above. When new
+ items are scheduled to be offloaded to that workqueue, we need to update
+ this like a 1-deep fifo to maintain the invariant that workqueue items
+ queued by one thread are started in order */
+ grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
+ /** last active combiner in the active combiner list */
+ grpc_combiner *last_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
+/* initializer for grpc_exec_ctx:
+ prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
+ { \
+ GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
+ finish_check \
+ }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
@@ -85,8 +103,10 @@ struct grpc_exec_ctx {
{ false, finish_check_arg, finish_check }
#endif
+/* initialize an execution context at the top level of an API call into grpc
+ (this is safe to use elsewhere, though possibly not as efficient) */
#define GRPC_EXEC_CTX_INIT \
- GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
+ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index d67d388b8c..4fd83e0b22 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -112,6 +112,14 @@ void grpc_iomgr_shutdown(void) {
continue;
}
if (g_root_object.next != &g_root_object) {
+ if (grpc_iomgr_abort_on_leaks()) {
+ gpr_log(GPR_DEBUG, "Failed to free %" PRIuPTR
+ " iomgr objects before shutdown deadline: "
+ "memory leaks are likely",
+ count_objects());
+ dump_objects("LEAKED");
+ abort();
+ }
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
@@ -122,9 +130,6 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
- if (grpc_iomgr_abort_on_leaks()) {
- abort();
- }
}
break;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 92767721d5..00fd77679a 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -177,7 +177,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_run(exec_ctx, cb, error);
}
#define MAX_READ_IOVEC 4
@@ -209,11 +209,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
- GPR_TIMER_BEGIN("recvmsg", 1);
+ GPR_TIMER_BEGIN("recvmsg", 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
- GPR_TIMER_END("recvmsg", 0);
+ GPR_TIMER_END("recvmsg", read_bytes >= 0);
if (read_bytes < 0) {
/* NB: After calling call_read_cb a parallel call of the read handler may
@@ -392,11 +392,8 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error_free_string(str);
}
- GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
- cb->cb(exec_ctx, cb->cb_arg, error);
- GPR_TIMER_END("tcp_handle_write.cb", 0);
+ grpc_closure_run(exec_ctx, cb, error);
TCP_UNREF(exec_ctx, tcp, "write");
- GRPC_ERROR_UNREF(error);
}
}
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index a825d2a28b..5a9a177963 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -49,11 +49,11 @@ typedef struct grpc_timer {
} grpc_timer;
/* Initialize *timer. When expired or canceled, timer_cb will be called with
- *timer_cb_arg and status to indicate if it expired (SUCCESS) or was
- canceled (CANCELLED). timer_cb is guaranteed to be called exactly once,
- and application code should check the status to determine how it was
- invoked. The application callback is also responsible for maintaining
- information about when to free up any user-level state. */
+ *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or
+ was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called
+ exactly once, and application code should check the error to determine
+ how it was invoked. The application callback is also responsible for
+ maintaining information about when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
void *timer_cb_arg, gpr_timespec now);
@@ -74,8 +74,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
In all of these cases, the cancellation is still considered successful.
They are essentially distinguished in that the timer_cb will be run
- exactly once from either the cancellation (with status CANCELLED)
- or from the activation (with status SUCCESS)
+ exactly once from either the cancellation (with error GRPC_ERROR_CANCELLED)
+ or from the activation (with error GRPC_ERROR_NONE).
Note carefully that the callback function MAY occur in the same callstack
as grpc_timer_cancel. It's expected that most timers will be cancelled (their
@@ -83,14 +83,13 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
that cancellation costs as little as possible. Making callbacks run inline
matches this aim.
- Requires: cancel() must happen after add() on a given timer */
+ Requires: cancel() must happen after init() on a given timer */
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
/* iomgr internal api for dealing with timers */
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
- Drops drop_mu if it is non-null before executing callbacks.
If next is non-null, TRY to update *next with the next running timer
IF that timer occurs before *next current value.
*next is never guaranteed to be updated on any given execution; however,
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c
new file mode 100644
index 0000000000..b4165208ed
--- /dev/null
+++ b/src/core/lib/iomgr/wakeup_fd_cv.c
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_WAKEUP_FD
+
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
+
+#include <errno.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+
+#define MAX_TABLE_RESIZE 256
+
+extern cv_fd_table g_cvfds;
+
+static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
+ unsigned int i, newsize;
+ int idx;
+ gpr_mu_lock(&g_cvfds.mu);
+ if (!g_cvfds.free_fds) {
+ newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
+ g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
+ for (i = g_cvfds.size; i < newsize; i++) {
+ g_cvfds.cvfds[i].is_set = 0;
+ g_cvfds.cvfds[i].cvs = NULL;
+ g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[i];
+ }
+ g_cvfds.size = newsize;
+ }
+
+ idx = (int)(g_cvfds.free_fds - g_cvfds.cvfds);
+ g_cvfds.free_fds = g_cvfds.free_fds->next_free;
+ g_cvfds.cvfds[idx].cvs = NULL;
+ g_cvfds.cvfds[idx].is_set = 0;
+ fd_info->read_fd = IDX_TO_FD(idx);
+ fd_info->write_fd = -1;
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
+ cv_node* cvn;
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1;
+ cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs;
+ while (cvn) {
+ gpr_cv_signal(cvn->cv);
+ cvn = cvn->next;
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) {
+ gpr_mu_lock(&g_cvfds.mu);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0;
+ gpr_mu_unlock(&g_cvfds.mu);
+ return GRPC_ERROR_NONE;
+}
+
+static void cv_fd_destroy(grpc_wakeup_fd* fd_info) {
+ if (fd_info->read_fd == 0) {
+ return;
+ }
+ gpr_mu_lock(&g_cvfds.mu);
+ // Assert that there are no active pollers
+ GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs);
+ g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)];
+ gpr_mu_unlock(&g_cvfds.mu);
+}
+
+static int cv_check_availability(void) { return 1; }
+
+const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = {
+ cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy,
+ cv_check_availability};
+
+#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 03ee21cef7..ac16be1750 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,31 +31,50 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
+/*
+ * wakeup_fd_cv uses condition variables to implement wakeup fds.
+ *
+ * It is intended for use only in cases when eventfd() and pipe() are not
+ * available. It can only be used with the "poll" engine.
+ *
+ * Implementation:
+ * A global table of cv wakeup fds is mantained. A cv wakeup fd is a negative
+ * file descriptor. poll() is then run in a background thread with only the
+ * real socket fds while we wait on a condition variable trigged by either the
+ * poll() completion or a wakeup_fd() call.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
+#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/support/mpscq.h"
+#include <grpc/support/sync.h>
-struct grpc_fd;
+#include "src/core/lib/iomgr/ev_posix.h"
-struct grpc_workqueue {
- gpr_refcount refs;
- gpr_mpscq queue;
- // state is:
- // lower bit - zero if orphaned
- // other bits - number of items enqueued
- gpr_atm state;
+#define FD_TO_IDX(fd) (-(fd)-1)
+#define IDX_TO_FD(idx) (-(idx)-1)
- grpc_wakeup_fd wakeup_fd;
- struct grpc_fd *wakeup_read_fd;
+typedef struct cv_node {
+ gpr_cv* cv;
+ struct cv_node* next;
+} cv_node;
- grpc_closure read_closure;
-};
+typedef struct fd_node {
+ int is_set;
+ cv_node* cvs;
+ struct fd_node* next_free;
+} fd_node;
-/** Create a work queue. Returns an error if creation fails. If creation
- succeeds, sets *workqueue to point to it. */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
+typedef struct cv_fd_table {
+ gpr_mu mu;
+ int pollcount;
+ int shutdown;
+ gpr_cv shutdown_complete;
+ fd_node* cvfds;
+ fd_node* free_fds;
+ unsigned int size;
+ grpc_poll_function_type poll;
+} cv_fd_table;
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
+#endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */
diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c
index 4e5dbdcb73..d0ea216aa0 100644
--- a/src/core/lib/iomgr/wakeup_fd_pipe.c
+++ b/src/core/lib/iomgr/wakeup_fd_pipe.c
@@ -47,11 +47,10 @@
static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) {
int pipefd[2];
- /* TODO(klempner): Make this nonfatal */
int r = pipe(pipefd);
if (0 != r) {
gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno));
- abort();
+ return GRPC_OS_ERROR(errno, "pipe");
}
grpc_error* err;
err = grpc_set_socket_nonblocking(pipefd[0], 1);
@@ -95,8 +94,13 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) {
}
static int pipe_check_availability(void) {
- /* Assume that pipes are always available. */
- return 1;
+ grpc_wakeup_fd fd;
+ if (pipe_init(&fd) == GRPC_ERROR_NONE) {
+ pipe_destroy(&fd);
+ return 1;
+ } else {
+ return 0;
+ }
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c
index 046208abc8..5c894bef37 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.c
+++ b/src/core/lib/iomgr/wakeup_fd_posix.c
@@ -36,37 +36,66 @@
#ifdef GPR_POSIX_WAKEUP_FD
#include <stddef.h>
+#include "src/core/lib/iomgr/wakeup_fd_cv.h"
#include "src/core/lib/iomgr/wakeup_fd_pipe.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
+
int grpc_allow_specialized_wakeup_fd = 1;
+int grpc_allow_pipe_wakeup_fd = 1;
+
+int has_real_wakeup_fd = 1;
+int cv_wakeup_fds_enabled = 0;
void grpc_wakeup_fd_global_init(void) {
if (grpc_allow_specialized_wakeup_fd &&
grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
- } else {
+ } else if (grpc_allow_pipe_wakeup_fd &&
+ grpc_pipe_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
+ } else {
+ has_real_wakeup_fd = 0;
}
}
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
+int grpc_has_wakeup_fd(void) { return has_real_wakeup_fd; }
+
+int grpc_cv_wakeup_fds_enabled(void) { return cv_wakeup_fds_enabled; }
+
+void grpc_enable_cv_wakeup_fds(int enable) { cv_wakeup_fds_enabled = enable; }
+
grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.init(fd_info);
+ }
return wakeup_fd_vtable->init(fd_info);
}
grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.consume(fd_info);
+ }
return wakeup_fd_vtable->consume(fd_info);
}
grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
+ if (cv_wakeup_fds_enabled) {
+ return grpc_cv_wakeup_fd_vtable.wakeup(fd_info);
+ }
return wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
- wakeup_fd_vtable->destroy(fd_info);
+ if (cv_wakeup_fds_enabled) {
+ grpc_cv_wakeup_fd_vtable.destroy(fd_info);
+ } else {
+ wakeup_fd_vtable->destroy(fd_info);
+ }
}
#endif /* GPR_POSIX_WAKEUP_FD */
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index e269f242d8..71d32d97ba 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -71,6 +71,10 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
+int grpc_has_wakeup_fd(void);
+int grpc_cv_wakeup_fds_enabled(void);
+void grpc_enable_cv_wakeup_fds(int enable);
+
typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
@@ -88,6 +92,7 @@ struct grpc_wakeup_fd {
};
extern int grpc_allow_specialized_wakeup_fd;
+extern int grpc_allow_pipe_wakeup_fd;
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index b2805dc66c..5b96d1d851 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -40,10 +40,6 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/lib/iomgr/workqueue_posix.h"
-#endif
-
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/workqueue_windows.h"
#endif
@@ -58,20 +54,20 @@
string will be printed alongside the refcount. When it is not defined, the
string will be discarded at compilation time. */
-//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
+/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
- (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
+ grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason);
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
-#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
+#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
-void grpc_workqueue_ref(grpc_workqueue *workqueue);
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
deleted file mode 100644
index ecfea68f56..0000000000
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-#include <stdio.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/profiling/timers.h"
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue) {
- char name[32];
- *workqueue = gpr_malloc(sizeof(grpc_workqueue));
- gpr_ref_init(&(*workqueue)->refs, 1);
- gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
- grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
- if (err != GRPC_ERROR_NONE) {
- gpr_free(*workqueue);
- return err;
- }
- sprintf(name, "workqueue:%p", (void *)(*workqueue));
- (*workqueue)->wakeup_read_fd = grpc_fd_create(
- GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
- gpr_mpscq_init(&(*workqueue)->queue);
- grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
- grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
- &(*workqueue)->read_closure);
- return GRPC_ERROR_NONE;
-}
-
-static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
-}
-
-static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
- workqueue_destroy(exec_ctx, workqueue);
- }
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason) {
- if (workqueue == NULL) return;
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
- workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
- reason);
- gpr_ref(&workqueue->refs);
-}
-#else
-void grpc_workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
- gpr_ref(&workqueue->refs);
-}
-#endif
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue == NULL) return;
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
- workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
- reason);
- if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
- }
-}
-#else
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
- if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
- }
-}
-#endif
-
-static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- abort();
-}
-
-static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- GPR_TIMER_MARK("workqueue.wakeup", 0);
- grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
- if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
- drain(exec_ctx, workqueue);
- }
-}
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.on_readable", 0);
-
- grpc_workqueue *workqueue = arg;
-
- if (error != GRPC_ERROR_NONE) {
- /* HACK: let wakeup_fd code know that we stole the fd */
- workqueue->wakeup_fd.read_fd = 0;
- grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
- grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
- GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
- gpr_free(workqueue);
- } else {
- error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
- gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
- if (error == GRPC_ERROR_NONE) {
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
- } else {
- /* recurse to get error handling */
- on_readable(exec_ctx, arg, error);
- }
- if (n == NULL) {
- /* try again - queue in an inconsistant state */
- wakeup(exec_ctx, workqueue);
- } else {
- switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
- case 3: // had one count, one unorphaned --> done, unorphaned
- break;
- case 2: // had one count, one orphaned --> done, orphaned
- workqueue_destroy(exec_ctx, workqueue);
- break;
- case 1:
- case 0:
- // these values are illegal - representing an already done or
- // deleted workqueue
- GPR_UNREACHABLE_CODE(break);
- default:
- // schedule a wakeup since there's more to do
- wakeup(exec_ctx, workqueue);
- }
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *clerr = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, clerr);
- GRPC_ERROR_UNREF(clerr);
- }
- }
-
- GPR_TIMER_END("workqueue.on_readable", 0);
-}
-
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
- GPR_ASSERT(last & 1);
- closure->error = error;
- gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
- if (last == 1) {
- wakeup(exec_ctx, workqueue);
- }
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index ee81dc248e..5c93d3c59e 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -43,12 +43,16 @@
// workqueues.
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason) {}
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason) {
+ return workqueue;
+}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {}
#else
-void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif