aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-09-08 14:57:56 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-09-08 14:57:56 -0700
commit84ea341aa3c1435182d8f2e0a687fa45bd4a8d1c (patch)
treeeb958f5510337bf58b80449fdb4bfc49a3aa41bc
parent44b12f9e23cbdb5f5f1be0681b81aeded481debf (diff)
Minor perf improvements
-rw-r--r--src/core/lib/iomgr/combiner.c2
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c2
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c71
3 files changed, 50 insertions, 25 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index b2d6559751..273505f8b8 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -189,7 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
if (lock->optional_workqueue != NULL &&
- grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) {
+ is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and
// so can help the execution context out): schedule remaining work to be
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 740920d760..f473e4765c 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1299,7 +1299,7 @@ static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->polling_island == NULL);
}
-#define GRPC_EPOLL_MAX_EVENTS 1000
+#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 836bb8b6e0..6f8a26684a 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -146,42 +146,67 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
gpr_free(workqueue);
} else {
- error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
- if (error != GRPC_ERROR_NONE) {
- /* recurse to get error handling */
- on_readable(exec_ctx, arg, error);
- } else {
- gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
- if (n == NULL) {
- /* try again - queue in an ephemerally inconsistent state */
- wakeup(exec_ctx, workqueue);
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
- } else {
+ gpr_mpscq_node *n = NULL;
+ for (int i = 0; i < 100; i++) {
+ n = gpr_mpscq_pop(&workqueue->queue);
+ if (n != NULL) {
+ grpc_closure *c = (grpc_closure *)n;
+ grpc_closure_run(exec_ctx, c, c->error_data.error);
+ grpc_exec_ctx_flush(exec_ctx);
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2);
switch (last) {
default:
- // schedule a wakeup since there's more to do
- wakeup(exec_ctx, workqueue);
- break;
+ // there's more to do, keep going
+ goto keep_going;
case 3: // had one count, one unorphaned --> done, unorphaned
- break;
+ goto switch_to_idle;
case 2: // had one count, one orphaned --> done, orphaned
- workqueue_destroy(exec_ctx, workqueue);
- break;
+ goto destroy;
case 1:
case 0:
// these values are illegal - representing an already done or
// deleted workqueue
GPR_UNREACHABLE_CODE(break);
}
- grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
- &workqueue->read_closure);
- grpc_closure *cl = (grpc_closure *)n;
- grpc_error *clerr = cl->error_data.error;
- grpc_closure_run(exec_ctx, cl, clerr);
}
}
+ /* fall through to wakeup_next -- we tried a bunch of times to pull a node
+ * but failed */
+wakeup_next:
+ error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+ if (error != GRPC_ERROR_NONE) {
+ /* recurse to get error handling */
+ on_readable(exec_ctx, arg, error);
+ } else {
+ grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
+ &workqueue->read_closure);
+ wakeup(exec_ctx, workqueue);
+ }
+ return;
+
+keep_going:
+ if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ goto wakeup_next;
+ } else {
+ /* recurse to continue */
+ on_readable(exec_ctx, arg, GRPC_ERROR_NONE);
+ }
+ return;
+
+switch_to_idle:
+ error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+ if (error != GRPC_ERROR_NONE) {
+ /* recurse to get error handling */
+ on_readable(exec_ctx, arg, error);
+ } else {
+ grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
+ &workqueue->read_closure);
+ }
+ return;
+
+destroy:
+ workqueue_destroy(exec_ctx, workqueue);
+ return;
}
GPR_TIMER_END("workqueue.on_readable", 0);