diff options
author | Craig Tiller <ctiller@google.com> | 2016-09-08 14:57:56 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-09-08 14:57:56 -0700 |
commit | 84ea341aa3c1435182d8f2e0a687fa45bd4a8d1c (patch) | |
tree | eb958f5510337bf58b80449fdb4bfc49a3aa41bc | |
parent | 44b12f9e23cbdb5f5f1be0681b81aeded481debf (diff) |
Minor perf improvements
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 71 |
3 files changed, 50 insertions, 25 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index b2d6559751..273505f8b8 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -189,7 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) { + is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 740920d760..f473e4765c 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1299,7 +1299,7 @@ static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->polling_island == NULL); } -#define GRPC_EPOLL_MAX_EVENTS 1000 +#define GRPC_EPOLL_MAX_EVENTS 100 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 836bb8b6e0..6f8a26684a 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -146,42 +146,67 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); gpr_free(workqueue); } else { - error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - if (error != GRPC_ERROR_NONE) { - /* recurse to get error handling */ - on_readable(exec_ctx, arg, error); - } else { - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); - if (n == NULL) { - /* try again - queue in an ephemerally inconsistent state */ - wakeup(exec_ctx, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - } else { + gpr_mpscq_node *n = NULL; + for (int i = 0; i < 100; i++) { + n = gpr_mpscq_pop(&workqueue->queue); + if (n != NULL) { + grpc_closure *c = (grpc_closure *)n; + grpc_closure_run(exec_ctx, c, c->error_data.error); + grpc_exec_ctx_flush(exec_ctx); gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2); switch (last) { default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); - break; + // there's more to do, keep going + goto keep_going; case 3: // had one count, one unorphaned --> done, unorphaned - break; + goto switch_to_idle; case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; + goto destroy; case 1: case 0: // these values are illegal - representing an already done or // deleted workqueue GPR_UNREACHABLE_CODE(break); } - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error_data.error; - grpc_closure_run(exec_ctx, cl, clerr); } } + /* fall through to wakeup_next -- we tried a bunch of times to pull a node + * but failed */ +wakeup_next: + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + if (error != GRPC_ERROR_NONE) { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } else { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + wakeup(exec_ctx, workqueue); + } + return; + +keep_going: + if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { + goto wakeup_next; + } else { + /* recurse to continue */ + on_readable(exec_ctx, arg, GRPC_ERROR_NONE); + } + return; + +switch_to_idle: + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + if (error != GRPC_ERROR_NONE) { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } else { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } + return; + +destroy: + workqueue_destroy(exec_ctx, workqueue); + return; } GPR_TIMER_END("workqueue.on_readable", 0); |