aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/closure.c18
-rw-r--r--src/core/lib/iomgr/closure.h11
-rw-r--r--src/core/lib/iomgr/combiner.c372
-rw-r--r--src/core/lib/iomgr/combiner.h15
-rw-r--r--src/core/lib/iomgr/error.c18
-rw-r--r--src/core/lib/iomgr/error.h12
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c227
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c30
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c30
-rw-r--r--src/core/lib/iomgr/ev_posix.c23
-rw-r--r--src/core/lib/iomgr/ev_posix.h13
-rw-r--r--src/core/lib/iomgr/exec_ctx.c61
-rw-r--r--src/core/lib/iomgr/exec_ctx.h24
-rw-r--r--src/core/lib/iomgr/iomgr.c11
-rw-r--r--src/core/lib/iomgr/tcp_posix.c11
-rw-r--r--src/core/lib/iomgr/workqueue.h16
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c196
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h61
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c10
19 files changed, 633 insertions, 526 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 1ba0a5c141..c6ddc76732 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -35,6 +35,8 @@
#include <grpc/support/alloc.h>
+#include "src/core/lib/profiling/timers.h"
+
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
@@ -51,7 +53,7 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
GRPC_ERROR_UNREF(error);
return;
}
- closure->error = error;
+ closure->error_data.error = error;
closure->next_data.next = NULL;
if (closure_list->head == NULL) {
closure_list->head = closure;
@@ -64,8 +66,8 @@ void grpc_closure_list_append(grpc_closure_list *closure_list,
void grpc_closure_list_fail_all(grpc_closure_list *list,
grpc_error *forced_failure) {
for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) {
- if (c->error == GRPC_ERROR_NONE) {
- c->error = GRPC_ERROR_REF(forced_failure);
+ if (c->error_data.error == GRPC_ERROR_NONE) {
+ c->error_data.error = GRPC_ERROR_REF(forced_failure);
}
}
GRPC_ERROR_UNREF(forced_failure);
@@ -110,3 +112,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
return &wc->wrapper;
}
+
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_closure_run", 0);
+ if (c != NULL) {
+ c->cb(exec_ctx, c->cb_arg, error);
+ }
+ GRPC_ERROR_UNREF(error);
+ GPR_TIMER_END("grpc_closure_run", 0);
+}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index c1a22b6021..2b4b271eaa 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -76,7 +76,10 @@ struct grpc_closure {
void *cb_arg;
/** Once queued, the result of the closure. Before then: scratch space */
- grpc_error *error;
+ union {
+ grpc_error *error;
+ uintptr_t scratch;
+ } error_data;
};
/** Initializes \a closure with \a cb and \a cb_arg. */
@@ -106,4 +109,10 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
bool grpc_closure_list_empty(grpc_closure_list list);
+/** Run a closure directly. Caller ensures that no locks are being held above.
+ * Note that calling this at the end of a closure callback function itself is
+ * by definition safe. */
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 831bdb4aff..60ee14eb23 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -50,25 +50,57 @@ int grpc_combiner_trace = 0;
} \
} while (0)
+#define STATE_UNORPHANED 1
+#define STATE_ELEM_COUNT_LOW_BIT 2
+
struct grpc_combiner {
+ grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
// state is:
- // lower bit - zero if orphaned
- // other bits - number of items queued on the lock
+ // lower bit - zero if orphaned (STATE_UNORPHANED)
+ // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
- bool take_async_break_before_final_list;
+ // number of elements in the list that are covered by a poller: if >0, we can
+ // offload safely
+ gpr_atm elements_covered_by_poller;
+ bool time_to_execute_final_list;
+ bool final_list_covered_by_poller;
grpc_closure_list final_list;
- grpc_closure continue_finishing;
+ grpc_closure offload;
};
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
+
+typedef struct {
+ grpc_error *error;
+ bool covered_by_poller;
+} error_data;
+
+static uintptr_t pack_error_data(error_data d) {
+ return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
+}
+
+static error_data unpack_error_data(uintptr_t p) {
+ return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
+}
+
+static bool is_covered_by_poller(grpc_combiner *lock) {
+ return lock->final_list_covered_by_poller ||
+ gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
+}
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ lock->next_combiner_on_this_exec_ctx = NULL;
+ lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
- gpr_atm_no_barrier_store(&lock->state, 1);
+ lock->final_list_covered_by_poller = false;
+ gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
+ gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
- lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list);
+ grpc_closure_init(&lock->offload, offload, lock);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -82,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1);
+ gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
if (old_state == 1) {
@@ -90,170 +122,186 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
}
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
+static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock) {
+ lock->next_combiner_on_this_exec_ctx = NULL;
+ if (exec_ctx->active_combiner == NULL) {
+ exec_ctx->active_combiner = exec_ctx->last_combiner = lock;
+ } else {
+ exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock;
+ exec_ctx->last_combiner = lock;
+ }
+}
-static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
+static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock) {
+ lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner;
exec_ctx->active_combiner = lock;
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_mainline", 0);
+ if (lock->next_combiner_on_this_exec_ctx == NULL) {
+ exec_ctx->last_combiner = lock;
+ }
}
-static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.execute_final", 0);
- grpc_closure *c = lock->final_list.head;
- GPR_ASSERT(c != NULL);
- grpc_closure_list_init(&lock->final_list);
- lock->take_async_break_before_final_list = false;
- int loops = 0;
- while (c != NULL) {
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- loops++;
+void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
+ GPR_TIMER_BEGIN("combiner.execute", 0);
+ gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
+ GRPC_COMBINER_TRACE(gpr_log(
+ GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
+ cl, covered_by_poller, last));
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
+ cl->error_data.scratch =
+ pack_error_data((error_data){error, covered_by_poller});
+ if (covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
+ }
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ if (last == 1) {
+ // first element on this list: add it to the list of combiner locks
+ // executing within this exec_ctx
+ push_last_on_exec_ctx(exec_ctx, lock);
}
- GPR_TIMER_END("combiner.execute_final", 0);
+ GPR_TIMER_END("combiner.execute", 0);
}
-static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
- grpc_combiner *lock = arg;
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock));
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- exec_ctx->active_combiner = lock;
- // quick peek to see if new things have turned up on the queue: if so, go back
- // to executing them before the final list
- if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
- if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
- } else {
- execute_final(exec_ctx, lock);
- finish(exec_ctx, lock);
+static void move_next(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->active_combiner =
+ exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
+ if (exec_ctx->active_combiner == NULL) {
+ exec_ctx->last_combiner = NULL;
}
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- GPR_TIMER_END("combiner.continue_executing_final", 0);
}
-static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.start_execute_final", 0);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG,
- "C:%p start_execute_final take_async_break_before_final_list=%d",
- lock, lock->take_async_break_before_final_list));
- if (lock->take_async_break_before_final_list) {
- grpc_closure_init(&lock->continue_finishing, continue_executing_final,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.start_execute_final", 0);
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_combiner *lock = arg;
+ push_last_on_exec_ctx(exec_ctx, lock);
+}
+
+static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
+ move_next(exec_ctx);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
+ lock->optional_workqueue));
+ grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
+ GRPC_ERROR_NONE);
+}
+
+bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
+ GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0);
+ grpc_combiner *lock = exec_ctx->active_combiner;
+ if (lock == NULL) {
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return false;
- } else {
- execute_final(exec_ctx, lock);
- GPR_TIMER_END("combiner.start_execute_final", 0);
- return true;
}
-}
-static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
- gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- if (n == NULL) {
- // Queue is in an transiently inconsistent state: a new item is being queued
- // but is not visible to this thread yet.
- // Use this as a cue that we should go off and do something else for a while
- // (and come back later)
- grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
- lock);
- grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
- GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return false;
+ gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
+ "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+ "time_to_execute_final_list=%d",
+ lock, lock->optional_workqueue, is_covered_by_poller(lock),
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
+ lock->time_to_execute_final_list));
+
+ if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
+ grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
+ // this execution context wants to move on, and we have a workqueue (and
+ // so can help the execution context out): schedule remaining work to be
+ // picked up on the workqueue
+ queue_offload(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *error = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("combiner.maybe_finish_one", 0);
- return true;
-}
-static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
- bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock);
- GPR_TIMER_BEGIN("combiner.finish", 0);
- int loops = 0;
- do {
- executor = maybe_finish_one;
- gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
- "C:%p finish[%d] old_state=%" PRIdPTR, lock,
- loops, old_state));
- switch (old_state) {
- default:
- // we have multiple queued work items: just continue executing them
- break;
- case 5: // we're down to one queued item: if it's the final list we
- case 4: // should do that
- if (!grpc_closure_list_empty(lock->final_list)) {
- executor = start_execute_final;
- }
- break;
- case 3: // had one count, one unorphaned --> unlocked unorphaned
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 2: // and one count, one orphaned --> unlocked and orphaned
- really_destroy(exec_ctx, lock);
- GPR_TIMER_END("combiner.finish", 0);
- return;
- case 1:
- case 0:
- // these values are illegal - representing an already unlocked or
- // deleted lock
- GPR_UNREACHABLE_CODE(return );
+ if (!lock->time_to_execute_final_list ||
+ // peek to see if something new has shown up, and execute that with
+ // priority
+ (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
+ GRPC_COMBINER_TRACE(
+ gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
+ if (n == NULL) {
+ // queue is in an inconsistent state: use this as a cue that we should
+ // go off and do something else for a while (and come back later)
+ GPR_TIMER_MARK("delay_busy", 0);
+ if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
+ queue_offload(exec_ctx, lock);
+ }
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
- loops++;
- } while (executor(exec_ctx, lock));
- GPR_TIMER_END("combiner.finish", 0);
-}
+ GPR_TIMER_BEGIN("combiner.exec1", 0);
+ grpc_closure *cl = (grpc_closure *)n;
+ error_data err = unpack_error_data(cl->error_data.scratch);
+ cl->cb(exec_ctx, cl->cb_arg, err.error);
+ if (err.covered_by_poller) {
+ gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1);
+ }
+ GRPC_ERROR_UNREF(err.error);
+ GPR_TIMER_END("combiner.exec1", 0);
+ } else {
+ grpc_closure *c = lock->final_list.head;
+ GPR_ASSERT(c != NULL);
+ grpc_closure_list_init(&lock->final_list);
+ lock->final_list_covered_by_poller = false;
+ int loops = 0;
+ while (c != NULL) {
+ GPR_TIMER_BEGIN("combiner.exec_1final", 0);
+ GRPC_COMBINER_TRACE(
+ gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ GPR_TIMER_END("combiner.exec_1final", 0);
+ }
+ }
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error) {
+ GPR_TIMER_MARK("unref", 0);
+ move_next(exec_ctx);
+ lock->time_to_execute_final_list = false;
+ gpr_atm old_state =
+ gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl));
- GPR_TIMER_BEGIN("combiner.execute", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
- GPR_ASSERT(last & 1); // ensure lock has not been destroyed
- if (last == 1) {
- exec_ctx->active_combiner = lock;
- GPR_TIMER_BEGIN("combiner.execute_first_cb", 0);
- cl->cb(exec_ctx, cl->cb_arg, error);
- GPR_TIMER_END("combiner.execute_first_cb", 0);
- GRPC_ERROR_UNREF(error);
- finish(exec_ctx, lock);
- GPR_ASSERT(exec_ctx->active_combiner == lock);
- exec_ctx->active_combiner = NULL;
- } else {
- cl->error = error;
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state));
+// Define a macro to ease readability of the following switch statement.
+#define OLD_STATE_WAS(orphaned, elem_count) \
+ (((orphaned) ? 0 : STATE_UNORPHANED) | \
+ ((elem_count)*STATE_ELEM_COUNT_LOW_BIT))
+ // Depending on what the previous state was, we need to perform different
+ // actions.
+ switch (old_state) {
+ default:
+ // we have multiple queued work items: just continue executing them
+ break;
+ case OLD_STATE_WAS(false, 2):
+ case OLD_STATE_WAS(true, 2):
+ // we're down to one queued item: if it's the final list we should do that
+ if (!grpc_closure_list_empty(lock->final_list)) {
+ lock->time_to_execute_final_list = true;
+ }
+ break;
+ case OLD_STATE_WAS(false, 1):
+ // had one count, one unorphaned --> unlocked unorphaned
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
+ case OLD_STATE_WAS(true, 1):
+ // and one count, one orphaned --> unlocked and orphaned
+ really_destroy(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
+ case OLD_STATE_WAS(false, 0):
+ case OLD_STATE_WAS(true, 0):
+ // these values are illegal - representing an already unlocked or
+ // deleted lock
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ GPR_UNREACHABLE_CODE(return true);
}
- GPR_TIMER_END("combiner.execute", 0);
+ push_first_on_exec_ctx(exec_ctx, lock);
+ GPR_TIMER_END("combiner.continue_exec_ctx", 0);
+ return true;
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
@@ -264,30 +312,26 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
- bool force_async_break) {
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG,
- "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
- lock, closure, force_async_break, exec_ctx->active_combiner));
+ GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
+ closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error);
+ grpc_closure_create(enqueue_finally, closure), error,
+ false);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
- if (force_async_break) {
- lock->take_async_break_before_final_list = true;
- }
if (grpc_closure_list_empty(lock->final_list)) {
- gpr_atm_full_fetch_add(&lock->state, 2);
+ gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
+ }
+ if (covered_by_poller) {
+ lock->final_list_covered_by_poller = true;
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
-
-void grpc_combiner_force_async_finally(grpc_combiner *lock) {
- lock->take_async_break_before_final_list = true;
-}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 1409db24b9..d04eeed83a 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -52,19 +52,14 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Execute \a action within the lock.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error);
+ grpc_closure *closure, grpc_error *error,
+ bool covered_by_poller);
// Execute \a action within the lock just prior to unlocking.
-// if \a hint_async_break is true, the combiner tries to hand execution to
-// another thread before finishing the primary queue of combined closures and
-// executing the finally list.
-// Deprecation warning: \a hint_async_break will be removed in a future version
-// Takes a very slow and round-about path if not called from a
-// grpc_combiner_execute closure.
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
- bool hint_async_break);
-// Deprecated: force the finally list execution onto another thread
-void grpc_combiner_force_async_finally(grpc_combiner *lock);
+ bool covered_by_poller);
+
+bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
extern int grpc_combiner_trace;
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 31c80260f8..f6bb3a0477 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "http_status";
case GRPC_ERROR_INT_LIMIT:
return "limit";
+ case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
+ return "occurred_during_write";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "tsi_error";
case GRPC_ERROR_STR_FILENAME:
return "filename";
+ case GRPC_ERROR_STR_QUEUED_BUFFERS:
+ return "queued_buffers";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -265,7 +269,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
} else {
out = gpr_malloc(sizeof(*out));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "%p create copying", out);
+ gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs);
@@ -523,21 +527,25 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
+static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
+ bool *first) {
if (n == NULL) return;
- add_errs(n->left, s, sz, cap);
+ add_errs(n->left, s, sz, cap, first);
+ if (!*first) append_chr(',', s, sz, cap);
+ *first = false;
const char *e = grpc_error_string(n->value);
append_str(e, s, sz, cap);
grpc_error_free_string(e);
- add_errs(n->right, s, sz, cap);
+ add_errs(n->right, s, sz, cap, first);
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
+ bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap);
+ add_errs(err->errs.root, &s, &sz, &cap, &first);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 00ace8a7a9..f3f3b80a09 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -100,6 +100,8 @@ typedef enum {
GRPC_ERROR_INT_HTTP_STATUS,
/// context sensitive limit associated with the error
GRPC_ERROR_INT_LIMIT,
+ /// chttp2: did the error occur while a write was in progress
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
} grpc_error_ints;
typedef enum {
@@ -121,6 +123,8 @@ typedef enum {
GRPC_ERROR_STR_TSI_ERROR,
/// filename that we were trying to read/write when this error occurred
GRPC_ERROR_STR_FILENAME,
+ /// which data was queued for writing when the error occurred
+ GRPC_ERROR_STR_QUEUED_BUFFERS
} grpc_error_strs;
typedef enum {
@@ -128,9 +132,13 @@ typedef enum {
GRPC_ERROR_TIME_CREATED,
} grpc_error_times;
+/// The following "special" errors can be propagated without allocating memory.
+/// They are always even so that other code (particularly combiner locks) can
+/// safely use the lower bit for themselves.
+
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
-#define GRPC_ERROR_OOM ((grpc_error *)1)
-#define GRPC_ERROR_CANCELLED ((grpc_error *)2)
+#define GRPC_ERROR_OOM ((grpc_error *)2)
+#define GRPC_ERROR_CANCELLED ((grpc_error *)4)
const char *grpc_error_string(grpc_error *error);
void grpc_error_free_string(const char *str);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index ad1cfc2031..e5909d9380 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -152,20 +152,20 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-//#define GRPC_PI_REF_COUNT_DEBUG
-#ifdef GRPC_PI_REF_COUNT_DEBUG
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
+#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
+/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
@@ -185,8 +185,17 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
- /* The workqueue associated with this polling island */
- grpc_workqueue *workqueue;
+ /* Number of threads currently polling on this island */
+ gpr_atm poller_count;
+ /* Mutex guarding the read end of the workqueue (must be held to pop from
+ * workqueue_items) */
+ gpr_mu workqueue_read_mu;
+ /* Queue of closures to be executed */
+ gpr_mpscq workqueue_items;
+ /* Count of items in workqueue_items */
+ gpr_atm workqueue_item_count;
+ /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
+ grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -275,6 +284,10 @@ static bool append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
+/* The polling island being polled right now.
+ See comments in workqueue_maybe_wakeup for why this is tracked. */
+static __thread polling_island *g_current_thread_polling_island;
+
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -289,12 +302,12 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-#ifdef GRPC_PI_REF_COUNT_DEBUG
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
- int line) {
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static void pi_add_ref_dbg(polling_island *pi, const char *reason,
+ const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
@@ -302,12 +315,42 @@ static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
}
static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
- char *reason, char *file, int line) {
+ const char *reason, const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
+
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ if (workqueue != NULL) {
+ pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
+ }
+ return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {
+ if (workqueue != NULL) {
+ pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
+ }
+}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ if (workqueue != NULL) {
+ pi_add_ref((polling_island *)workqueue);
+ }
+ return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {
+ if (workqueue != NULL) {
+ pi_unref(exec_ctx, (polling_island *)workqueue);
+ }
+}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -315,10 +358,7 @@ static void pi_add_ref(polling_island *pi) {
}
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
- /* If ref count went to one, we're back to just the workqueue owning a ref.
- Unref the workqueue to break the loop.
-
- If ref count went to zero, delete the polling island.
+ /* If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
polling island (and that there is no racing pi_add_ref() call either).
@@ -326,20 +366,12 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
- switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
- case 2: /* last external ref: the only one now owned is by the workqueue */
- GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
- break;
- case 1: {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(exec_ctx, pi);
- if (next != NULL) {
- PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
- }
- break;
+ if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+ polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+ polling_island_delete(exec_ctx, pi);
+ if (next != NULL) {
+ PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
}
- case 0:
- GPR_UNREACHABLE_CODE(return );
}
}
@@ -488,11 +520,20 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- pi->workqueue = NULL;
+
+ gpr_mu_init(&pi->workqueue_read_mu);
+ gpr_mpscq_init(&pi->workqueue_items);
+ gpr_atm_rel_store(&pi->workqueue_item_count, 0);
gpr_atm_rel_store(&pi->ref_count, 0);
+ gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
+ if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
+ err_desc)) {
+ goto done;
+ }
+
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -501,26 +542,14 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
}
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+ polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
- if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
- err_desc) &&
- *error == GRPC_ERROR_NONE) {
- polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
- error);
- GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
- pi->workqueue->wakeup_read_fd->polling_island = pi;
- PI_ADD_REF(pi, "fd");
- }
-
done:
if (*error != GRPC_ERROR_NONE) {
- if (pi->workqueue != NULL) {
- GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
- }
polling_island_delete(exec_ctx, pi);
pi = NULL;
}
@@ -533,7 +562,11 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
+ GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
+ gpr_mu_destroy(&pi->workqueue_read_mu);
+ gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
+ grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_free(pi->fds);
gpr_free(pi);
}
@@ -678,6 +711,45 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
+static void workqueue_maybe_wakeup(polling_island *pi) {
+ /* If this thread is the current poller, then it may be that it's about to
+ decrement the current poller count, so we need to look past this thread */
+ bool is_current_poller = (g_current_thread_polling_island == pi);
+ gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
+ gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+ /* Only issue a wakeup if it's likely that some poller could come in and take
+ it right now. Note that since we do an anticipatory mpscq_pop every poll
+ loop, it's ok if we miss the wakeup here, as we'll get the work item when
+ the next poller enters anyway. */
+ if (current_pollers > min_current_pollers_for_wakeup) {
+ GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
+ grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
+ }
+}
+
+static void workqueue_move_items_to_parent(polling_island *q) {
+ polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
+ if (p == NULL) {
+ return;
+ }
+ gpr_mu_lock(&q->workqueue_read_mu);
+ int num_added = 0;
+ while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
+ if (n != NULL) {
+ gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
+ gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
+ gpr_mpscq_push(&p->workqueue_items, n);
+ num_added++;
+ }
+ }
+ gpr_mu_unlock(&q->workqueue_read_mu);
+ if (num_added > 0) {
+ workqueue_maybe_wakeup(p);
+ }
+ workqueue_move_items_to_parent(p);
+}
+
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -702,6 +774,8 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
+
+ workqueue_move_items_to_parent(q);
}
/* else if p == q, nothing needs to be done */
@@ -712,6 +786,26 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ /* take a ref to the workqueue: otherwise it can happen that whatever events
+ * this kicks off ends up destroying the workqueue before this function
+ * completes */
+ GRPC_WORKQUEUE_REF(workqueue, "enqueue");
+ polling_island *pi = (polling_island *)workqueue;
+ gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
+ closure->error_data.error = error;
+ gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
+ if (last == 0) {
+ workqueue_maybe_wakeup(pi);
+ }
+ workqueue_move_items_to_parent(pi);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
+ GPR_TIMER_END("workqueue.enqueue", 0);
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1042,11 +1136,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- grpc_workqueue *workqueue = NULL;
- if (fd->polling_island != NULL) {
- workqueue =
- GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
- }
+ grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
+ (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
gpr_mu_unlock(&fd->mu);
return workqueue;
}
@@ -1299,7 +1390,29 @@ static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->polling_island == NULL);
}
-#define GRPC_EPOLL_MAX_EVENTS 1000
+static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
+ polling_island *pi) {
+ if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
+ gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
+ gpr_mu_unlock(&pi->workqueue_read_mu);
+ if (n != NULL) {
+ if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
+ workqueue_maybe_wakeup(pi);
+ }
+ grpc_closure *c = (grpc_closure *)n;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ return true;
+ } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
+ /* n == NULL might mean there's work but it's not available to be popped
+ * yet - try to ensure another workqueue wakes up to check shortly if so
+ */
+ workqueue_maybe_wakeup(pi);
+ }
+ }
+ return false;
+}
+
+#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
@@ -1354,7 +1467,13 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->mu);
- do {
+ /* If we get some workqueue work to do, it might end up completing an item on
+ the completion queue, so there's no need to poll... so we skip that and
+ redo the complete loop to verify */
+ if (!maybe_do_workqueue_work(exec_ctx, pi)) {
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+ g_current_thread_polling_island = pi;
+
GRPC_SCHEDULING_START_BLOCKING_REGION;
ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
sig_mask);
@@ -1386,6 +1505,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
append_error(error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
err_desc);
+ } else if (data_ptr == &pi->workqueue_wakeup_fd) {
+ append_error(error,
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ err_desc);
+ maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
@@ -1408,7 +1532,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
}
}
}
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+
+ g_current_thread_polling_island = NULL;
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+ }
GPR_ASSERT(pi != NULL);
@@ -1868,6 +1995,10 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index c2107e5e39..1829440a6e 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1989,6 +1989,32 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
* event engine binding
*/
@@ -2029,6 +2055,10 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index f137e4dacc..351b069613 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -1260,6 +1260,32 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+ const char *file, int line,
+ const char *reason) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue *workqueue, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
* Condition Variable polling extensions
*/
@@ -1498,6 +1524,10 @@ static const grpc_event_engine_vtable vtable = {
.kick_poller = kick_poller,
+ .workqueue_ref = workqueue_ref,
+ .workqueue_unref = workqueue_unref,
+ .workqueue_enqueue = workqueue_enqueue,
+
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 0637f80421..9857b0bce9 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -259,4 +259,27 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); }
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason) {
+ return g_event_engine->workqueue_ref(workqueue, file, line, reason);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {
+ g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason);
+}
+#else
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ return g_event_engine->workqueue_ref(workqueue);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+ g_event_engine->workqueue_unref(exec_ctx, workqueue);
+}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+}
+
#endif // GPR_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index c2aa1756ea..2fdef06838 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -40,6 +40,7 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/iomgr/workqueue.h"
typedef struct grpc_fd grpc_fd;
@@ -95,6 +96,18 @@ typedef struct grpc_event_engine_vtable {
grpc_error *(*kick_poller)(void);
void (*shutdown_engine)(void);
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+ grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason);
+ void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason);
+#else
+ grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
+ void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
+#endif
+ void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index ac7785ec13..604713e578 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
@@ -60,18 +61,43 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error;
- did_something = true;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ did_something = true;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ break;
+ }
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ if (exec_ctx->stealing_from_workqueue != NULL) {
+ if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ } else {
+ grpc_closure *c = exec_ctx->stolen_closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
+ "exec_ctx_sched");
+ exec_ctx->stealing_from_workqueue = NULL;
+ exec_ctx->stolen_closure = NULL;
+ grpc_error *error = c->error_data.error;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
- c = next;
+ GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
+ grpc_exec_ctx_flush(exec_ctx);
+ did_something = true;
}
}
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
@@ -86,12 +112,25 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
+ GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
if (offload_target_or_null == NULL) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else {
+ } else if (exec_ctx->stealing_from_workqueue == NULL) {
+ exec_ctx->stealing_from_workqueue = offload_target_or_null;
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ } else { /* stealing_from_workqueue == offload_target_or_null */
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
+ exec_ctx->stolen_closure,
+ exec_ctx->stolen_closure->error_data.error);
+ closure->error_data.error = error;
+ exec_ctx->stolen_closure = closure;
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
}
+ GPR_TIMER_END("grpc_exec_ctx_sched", 0);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 4d20ecf922..7e50cb9825 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,15 +66,33 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ /** The workqueue we're stealing work from.
+ As items are queued to the execution context, we try to steal one
+ workqueue item and execute it inline (assuming the exec_ctx is not
+ finished) - doing so does not invalidate the workqueue's contract, and
+ provides a small latency win in cases where we get a hit */
+ grpc_workqueue *stealing_from_workqueue;
+ /** The workqueue item that was stolen from the workqueue above. When new
+ items are scheduled to be offloaded to that workqueue, we need to update
+ this like a 1-deep fifo to maintain the invariant that workqueue items
+ queued by one thread are started in order */
+ grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
+ /** last active combiner in the active combiner list */
+ grpc_combiner *last_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
+/* initializer for grpc_exec_ctx:
+ prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
+ { \
+ GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
+ finish_check \
+ }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
@@ -85,8 +103,10 @@ struct grpc_exec_ctx {
{ false, finish_check_arg, finish_check }
#endif
+/* initialize an execution context at the top level of an API call into grpc
+ (this is safe to use elsewhere, though possibly not as efficient) */
#define GRPC_EXEC_CTX_INIT \
- GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
+ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index d67d388b8c..4fd83e0b22 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -112,6 +112,14 @@ void grpc_iomgr_shutdown(void) {
continue;
}
if (g_root_object.next != &g_root_object) {
+ if (grpc_iomgr_abort_on_leaks()) {
+ gpr_log(GPR_DEBUG, "Failed to free %" PRIuPTR
+ " iomgr objects before shutdown deadline: "
+ "memory leaks are likely",
+ count_objects());
+ dump_objects("LEAKED");
+ abort();
+ }
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
@@ -122,9 +130,6 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
- if (grpc_iomgr_abort_on_leaks()) {
- abort();
- }
}
break;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 92767721d5..00fd77679a 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -177,7 +177,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_run(exec_ctx, cb, error);
}
#define MAX_READ_IOVEC 4
@@ -209,11 +209,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
- GPR_TIMER_BEGIN("recvmsg", 1);
+ GPR_TIMER_BEGIN("recvmsg", 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
- GPR_TIMER_END("recvmsg", 0);
+ GPR_TIMER_END("recvmsg", read_bytes >= 0);
if (read_bytes < 0) {
/* NB: After calling call_read_cb a parallel call of the read handler may
@@ -392,11 +392,8 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error_free_string(str);
}
- GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
- cb->cb(exec_ctx, cb->cb_arg, error);
- GPR_TIMER_END("tcp_handle_write.cb", 0);
+ grpc_closure_run(exec_ctx, cb, error);
TCP_UNREF(exec_ctx, tcp, "write");
- GRPC_ERROR_UNREF(error);
}
}
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index b2805dc66c..5b96d1d851 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -40,10 +40,6 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/lib/iomgr/workqueue_posix.h"
-#endif
-
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/workqueue_windows.h"
#endif
@@ -58,20 +54,20 @@
string will be printed alongside the refcount. When it is not defined, the
string will be discarded at compilation time. */
-//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
+/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
- (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
+ grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason);
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
-#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
+#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
-void grpc_workqueue_ref(grpc_workqueue *workqueue);
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
deleted file mode 100644
index ecfea68f56..0000000000
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-#include <stdio.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/profiling/timers.h"
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue) {
- char name[32];
- *workqueue = gpr_malloc(sizeof(grpc_workqueue));
- gpr_ref_init(&(*workqueue)->refs, 1);
- gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
- grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
- if (err != GRPC_ERROR_NONE) {
- gpr_free(*workqueue);
- return err;
- }
- sprintf(name, "workqueue:%p", (void *)(*workqueue));
- (*workqueue)->wakeup_read_fd = grpc_fd_create(
- GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
- gpr_mpscq_init(&(*workqueue)->queue);
- grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
- grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
- &(*workqueue)->read_closure);
- return GRPC_ERROR_NONE;
-}
-
-static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
-}
-
-static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
- workqueue_destroy(exec_ctx, workqueue);
- }
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason) {
- if (workqueue == NULL) return;
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
- workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
- reason);
- gpr_ref(&workqueue->refs);
-}
-#else
-void grpc_workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
- gpr_ref(&workqueue->refs);
-}
-#endif
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue == NULL) return;
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
- workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
- reason);
- if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
- }
-}
-#else
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- if (workqueue == NULL) return;
- if (gpr_unref(&workqueue->refs)) {
- workqueue_orphan(exec_ctx, workqueue);
- }
-}
-#endif
-
-static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- abort();
-}
-
-static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- GPR_TIMER_MARK("workqueue.wakeup", 0);
- grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
- if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
- drain(exec_ctx, workqueue);
- }
-}
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.on_readable", 0);
-
- grpc_workqueue *workqueue = arg;
-
- if (error != GRPC_ERROR_NONE) {
- /* HACK: let wakeup_fd code know that we stole the fd */
- workqueue->wakeup_fd.read_fd = 0;
- grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
- grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
- GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
- gpr_free(workqueue);
- } else {
- error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
- gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
- if (error == GRPC_ERROR_NONE) {
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
- } else {
- /* recurse to get error handling */
- on_readable(exec_ctx, arg, error);
- }
- if (n == NULL) {
- /* try again - queue in an inconsistant state */
- wakeup(exec_ctx, workqueue);
- } else {
- switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
- case 3: // had one count, one unorphaned --> done, unorphaned
- break;
- case 2: // had one count, one orphaned --> done, orphaned
- workqueue_destroy(exec_ctx, workqueue);
- break;
- case 1:
- case 0:
- // these values are illegal - representing an already done or
- // deleted workqueue
- GPR_UNREACHABLE_CODE(break);
- default:
- // schedule a wakeup since there's more to do
- wakeup(exec_ctx, workqueue);
- }
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *clerr = cl->error;
- cl->cb(exec_ctx, cl->cb_arg, clerr);
- GRPC_ERROR_UNREF(clerr);
- }
- }
-
- GPR_TIMER_END("workqueue.on_readable", 0);
-}
-
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
- GPR_ASSERT(last & 1);
- closure->error = error;
- gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
- if (last == 1) {
- wakeup(exec_ctx, workqueue);
- }
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
deleted file mode 100644
index 03ee21cef7..0000000000
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
-
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/support/mpscq.h"
-
-struct grpc_fd;
-
-struct grpc_workqueue {
- gpr_refcount refs;
- gpr_mpscq queue;
- // state is:
- // lower bit - zero if orphaned
- // other bits - number of items enqueued
- gpr_atm state;
-
- grpc_wakeup_fd wakeup_fd;
- struct grpc_fd *wakeup_read_fd;
-
- grpc_closure read_closure;
-};
-
-/** Create a work queue. Returns an error if creation fails. If creation
- succeeds, sets *workqueue to point to it. */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index ee81dc248e..5c93d3c59e 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -43,12 +43,16 @@
// workqueues.
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason) {}
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+ int line, const char *reason) {
+ return workqueue;
+}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason) {}
#else
-void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ return workqueue;
+}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif