aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-10-13 10:02:08 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-10-13 10:02:08 -0700
commit460502e5ce71cb132e3be4e932b63cfd4e7f2349 (patch)
tree121ff288cc24bed5eb7d6379467b5b45cb523ead /src/core/lib
parent272c8d2ca931bf08c3292cbcae7874444d03e3c8 (diff)
Expand documentation
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c13
-rw-r--r--src/core/lib/iomgr/exec_ctx.h10
2 files changed, 23 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 98fe2defea..8de42bb7a9 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -165,6 +165,7 @@ static void fd_global_shutdown(void);
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
+/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
@@ -184,10 +185,16 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
+ /* Number of threads currently polling on this island */
gpr_atm poller_count;
+ /* Mutex guarding the read end of the workqueue (must be held to pop from
+ * workqueue_items) */
gpr_mu workqueue_read_mu;
+ /* Queue of closures to be executed */
gpr_mpscq workqueue_items;
+ /* Count of items in workqueue_items */
gpr_atm workqueue_item_count;
+ /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */
@@ -1396,6 +1403,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
grpc_closure_run(exec_ctx, c, c->error_data.error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
+ /* n == NULL might mean there's work but it's not available to be popped
+ * yet - try to ensure another workqueue wakes up to check shortly if so
+ */
workqueue_maybe_wakeup(pi);
}
}
@@ -1457,6 +1467,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->mu);
+ /* If we get some workqueue work to do, it might end up completing an item on
+ the completion queue, so there's no need to poll... so we skip that and
+ redo the complete loop to verify */
if (!maybe_do_workqueue_work(exec_ctx, pi)) {
gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
g_current_thread_polling_island = pi;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 4744e21c5e..7e50cb9825 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,10 +66,20 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ /** The workqueue we're stealing work from.
+ As items are queued to the execution context, we try to steal one
+ workqueue item and execute it inline (assuming the exec_ctx is not
+ finished) - doing so does not invalidate the workqueue's contract, and
+ provides a small latency win in cases where we get a hit */
grpc_workqueue *stealing_from_workqueue;
+ /** The workqueue item that was stolen from the workqueue above. When new
+ items are scheduled to be offloaded to that workqueue, we need to update
+ this like a 1-deep fifo to maintain the invariant that workqueue items
+ queued by one thread are started in order */
grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
+ /** last active combiner in the active combiner list */
grpc_combiner *last_combiner;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;