diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/closure.c | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 372 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.h | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 227 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 30 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 30 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.c | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.c | 61 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 24 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue.h | 16 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 196 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.h | 61 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_windows.c | 10 |
19 files changed, 633 insertions, 526 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 1ba0a5c141..c6ddc76732 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -35,6 +35,8 @@ #include <grpc/support/alloc.h> +#include "src/core/lib/profiling/timers.h" + void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; @@ -51,7 +53,7 @@ void grpc_closure_list_append(grpc_closure_list *closure_list, GRPC_ERROR_UNREF(error); return; } - closure->error = error; + closure->error_data.error = error; closure->next_data.next = NULL; if (closure_list->head == NULL) { closure_list->head = closure; @@ -64,8 +66,8 @@ void grpc_closure_list_append(grpc_closure_list *closure_list, void grpc_closure_list_fail_all(grpc_closure_list *list, grpc_error *forced_failure) { for (grpc_closure *c = list->head; c != NULL; c = c->next_data.next) { - if (c->error == GRPC_ERROR_NONE) { - c->error = GRPC_ERROR_REF(forced_failure); + if (c->error_data.error == GRPC_ERROR_NONE) { + c->error_data.error = GRPC_ERROR_REF(forced_failure); } } GRPC_ERROR_UNREF(forced_failure); @@ -110,3 +112,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { grpc_closure_init(&wc->wrapper, closure_wrapper, wc); return &wc->wrapper; } + +void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, + grpc_error *error) { + GPR_TIMER_BEGIN("grpc_closure_run", 0); + if (c != NULL) { + c->cb(exec_ctx, c->cb_arg, error); + } + GRPC_ERROR_UNREF(error); + GPR_TIMER_END("grpc_closure_run", 0); +} diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c1a22b6021..2b4b271eaa 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -76,7 +76,10 @@ struct grpc_closure { void *cb_arg; /** Once queued, the result of the closure. Before then: scratch space */ - grpc_error *error; + union { + grpc_error *error; + uintptr_t scratch; + } error_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -106,4 +109,10 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); /** return whether \a list is empty. */ bool grpc_closure_list_empty(grpc_closure_list list); +/** Run a closure directly. Caller ensures that no locks are being held above. + * Note that calling this at the end of a closure callback function itself is + * by definition safe. */ +void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error); + #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 831bdb4aff..60ee14eb23 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -50,25 +50,57 @@ int grpc_combiner_trace = 0; } \ } while (0) +#define STATE_UNORPHANED 1 +#define STATE_ELEM_COUNT_LOW_BIT 2 + struct grpc_combiner { + grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; gpr_mpscq queue; // state is: - // lower bit - zero if orphaned - // other bits - number of items queued on the lock + // lower bit - zero if orphaned (STATE_UNORPHANED) + // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) gpr_atm state; - bool take_async_break_before_final_list; + // number of elements in the list that are covered by a poller: if >0, we can + // offload safely + gpr_atm elements_covered_by_poller; + bool time_to_execute_final_list; + bool final_list_covered_by_poller; grpc_closure_list final_list; - grpc_closure continue_finishing; + grpc_closure offload; }; +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); + +typedef struct { + grpc_error *error; + bool covered_by_poller; +} error_data; + +static uintptr_t pack_error_data(error_data d) { + return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); +} + +static error_data unpack_error_data(uintptr_t p) { + return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; +} + +static bool is_covered_by_poller(grpc_combiner *lock) { + return lock->final_list_covered_by_poller || + gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; +} + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + lock->next_combiner_on_this_exec_ctx = NULL; + lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; - gpr_atm_no_barrier_store(&lock->state, 1); + lock->final_list_covered_by_poller = false; + gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); + gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); - lock->take_async_break_before_final_list = false; grpc_closure_list_init(&lock->final_list); + grpc_closure_init(&lock->offload, offload, lock); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -82,7 +114,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); + gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); if (old_state == 1) { @@ -90,170 +122,186 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock) { + lock->next_combiner_on_this_exec_ctx = NULL; + if (exec_ctx->active_combiner == NULL) { + exec_ctx->active_combiner = exec_ctx->last_combiner = lock; + } else { + exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock; + exec_ctx->last_combiner = lock; + } +} -static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0); - grpc_combiner *lock = arg; - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); +static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock) { + lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; exec_ctx->active_combiner = lock; - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_mainline", 0); + if (lock->next_combiner_on_this_exec_ctx == NULL) { + exec_ctx->last_combiner = lock; + } } -static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.execute_final", 0); - grpc_closure *c = lock->final_list.head; - GPR_ASSERT(c != NULL); - grpc_closure_list_init(&lock->final_list); - lock->take_async_break_before_final_list = false; - int loops = 0; - while (c != NULL) { - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - c = next; - loops++; +void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *cl, grpc_error *error, + bool covered_by_poller) { + GPR_TIMER_BEGIN("combiner.execute", 0); + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, + cl, covered_by_poller, last)); + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + cl->error_data.scratch = + pack_error_data((error_data){error, covered_by_poller}); + if (covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); + } + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + if (last == 1) { + // first element on this list: add it to the list of combiner locks + // executing within this exec_ctx + push_last_on_exec_ctx(exec_ctx, lock); } - GPR_TIMER_END("combiner.execute_final", 0); + GPR_TIMER_END("combiner.execute", 0); } -static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - GPR_TIMER_BEGIN("combiner.continue_executing_final", 0); - grpc_combiner *lock = arg; - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock)); - GPR_ASSERT(exec_ctx->active_combiner == NULL); - exec_ctx->active_combiner = lock; - // quick peek to see if new things have turned up on the queue: if so, go back - // to executing them before the final list - if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { - if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); - } else { - execute_final(exec_ctx, lock); - finish(exec_ctx, lock); +static void move_next(grpc_exec_ctx *exec_ctx) { + exec_ctx->active_combiner = + exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; + if (exec_ctx->active_combiner == NULL) { + exec_ctx->last_combiner = NULL; } - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - GPR_TIMER_END("combiner.continue_executing_final", 0); } -static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.start_execute_final", 0); - GPR_ASSERT(exec_ctx->active_combiner == lock); - GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, - "C:%p start_execute_final take_async_break_before_final_list=%d", - lock, lock->take_async_break_before_final_list)); - if (lock->take_async_break_before_final_list) { - grpc_closure_init(&lock->continue_finishing, continue_executing_final, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.start_execute_final", 0); +static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_combiner *lock = arg; + push_last_on_exec_ctx(exec_ctx, lock); +} + +static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + move_next(exec_ctx); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, + lock->optional_workqueue)); + grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload, + GRPC_ERROR_NONE); +} + +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { + GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); + grpc_combiner *lock = exec_ctx->active_combiner; + if (lock == NULL) { + GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; - } else { - execute_final(exec_ctx, lock); - GPR_TIMER_END("combiner.start_execute_final", 0); - return true; } -} -static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0); - gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); - GPR_ASSERT(exec_ctx->active_combiner == lock); - if (n == NULL) { - // Queue is in an transiently inconsistent state: a new item is being queued - // but is not visible to this thread yet. - // Use this as a cue that we should go off and do something else for a while - // (and come back later) - grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return false; + gpr_log(GPR_DEBUG, + "C:%p grpc_combiner_continue_exec_ctx workqueue=%p " + "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d " + "time_to_execute_final_list=%d", + lock, lock->optional_workqueue, is_covered_by_poller(lock), + grpc_exec_ctx_ready_to_finish(exec_ctx), + lock->time_to_execute_final_list)); + + if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) && + grpc_exec_ctx_ready_to_finish(exec_ctx)) { + GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); + // this execution context wants to move on, and we have a workqueue (and + // so can help the execution context out): schedule remaining work to be + // picked up on the workqueue + queue_offload(exec_ctx, lock); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *error = cl->error; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); - GPR_TIMER_END("combiner.maybe_finish_one", 0); - return true; -} -static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { - bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock); - GPR_TIMER_BEGIN("combiner.finish", 0); - int loops = 0; - do { - executor = maybe_finish_one; - gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); - GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, - "C:%p finish[%d] old_state=%" PRIdPTR, lock, - loops, old_state)); - switch (old_state) { - default: - // we have multiple queued work items: just continue executing them - break; - case 5: // we're down to one queued item: if it's the final list we - case 4: // should do that - if (!grpc_closure_list_empty(lock->final_list)) { - executor = start_execute_final; - } - break; - case 3: // had one count, one unorphaned --> unlocked unorphaned - GPR_TIMER_END("combiner.finish", 0); - return; - case 2: // and one count, one orphaned --> unlocked and orphaned - really_destroy(exec_ctx, lock); - GPR_TIMER_END("combiner.finish", 0); - return; - case 1: - case 0: - // these values are illegal - representing an already unlocked or - // deleted lock - GPR_UNREACHABLE_CODE(return ); + if (!lock->time_to_execute_final_list || + // peek to see if something new has shown up, and execute that with + // priority + (gpr_atm_acq_load(&lock->state) >> 1) > 1) { + gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); + if (n == NULL) { + // queue is in an inconsistent state: use this as a cue that we should + // go off and do something else for a while (and come back later) + GPR_TIMER_MARK("delay_busy", 0); + if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { + queue_offload(exec_ctx, lock); + } + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } - loops++; - } while (executor(exec_ctx, lock)); - GPR_TIMER_END("combiner.finish", 0); -} + GPR_TIMER_BEGIN("combiner.exec1", 0); + grpc_closure *cl = (grpc_closure *)n; + error_data err = unpack_error_data(cl->error_data.scratch); + cl->cb(exec_ctx, cl->cb_arg, err.error); + if (err.covered_by_poller) { + gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); + } + GRPC_ERROR_UNREF(err.error); + GPR_TIMER_END("combiner.exec1", 0); + } else { + grpc_closure *c = lock->final_list.head; + GPR_ASSERT(c != NULL); + grpc_closure_list_init(&lock->final_list); + lock->final_list_covered_by_poller = false; + int loops = 0; + while (c != NULL) { + GPR_TIMER_BEGIN("combiner.exec_1final", 0); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + GPR_TIMER_END("combiner.exec_1final", 0); + } + } -void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *cl, grpc_error *error) { + GPR_TIMER_MARK("unref", 0); + move_next(exec_ctx); + lock->time_to_execute_final_list = false; + gpr_atm old_state = + gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE( - gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); - GPR_TIMER_BEGIN("combiner.execute", 0); - gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); - GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (last == 1) { - exec_ctx->active_combiner = lock; - GPR_TIMER_BEGIN("combiner.execute_first_cb", 0); - cl->cb(exec_ctx, cl->cb_arg, error); - GPR_TIMER_END("combiner.execute_first_cb", 0); - GRPC_ERROR_UNREF(error); - finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - } else { - cl->error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); +// Define a macro to ease readability of the following switch statement. +#define OLD_STATE_WAS(orphaned, elem_count) \ + (((orphaned) ? 0 : STATE_UNORPHANED) | \ + ((elem_count)*STATE_ELEM_COUNT_LOW_BIT)) + // Depending on what the previous state was, we need to perform different + // actions. + switch (old_state) { + default: + // we have multiple queued work items: just continue executing them + break; + case OLD_STATE_WAS(false, 2): + case OLD_STATE_WAS(true, 2): + // we're down to one queued item: if it's the final list we should do that + if (!grpc_closure_list_empty(lock->final_list)) { + lock->time_to_execute_final_list = true; + } + break; + case OLD_STATE_WAS(false, 1): + // had one count, one unorphaned --> unlocked unorphaned + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; + case OLD_STATE_WAS(true, 1): + // and one count, one orphaned --> unlocked and orphaned + really_destroy(exec_ctx, lock); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; + case OLD_STATE_WAS(false, 0): + case OLD_STATE_WAS(true, 0): + // these values are illegal - representing an already unlocked or + // deleted lock + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + GPR_UNREACHABLE_CODE(return true); } - GPR_TIMER_END("combiner.execute", 0); + push_first_on_exec_ctx(exec_ctx, lock); + GPR_TIMER_END("combiner.continue_exec_ctx", 0); + return true; } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, @@ -264,30 +312,26 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, - bool force_async_break) { + bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p", - lock, closure, force_async_break, exec_ctx->active_combiner)); + GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, + closure, exec_ctx->active_combiner, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); grpc_combiner_execute(exec_ctx, lock, - grpc_closure_create(enqueue_finally, closure), error); + grpc_closure_create(enqueue_finally, closure), error, + false); GPR_TIMER_END("combiner.execute_finally", 0); return; } - if (force_async_break) { - lock->take_async_break_before_final_list = true; - } if (grpc_closure_list_empty(lock->final_list)) { - gpr_atm_full_fetch_add(&lock->state, 2); + gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); + } + if (covered_by_poller) { + lock->final_list_covered_by_poller = true; } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } - -void grpc_combiner_force_async_finally(grpc_combiner *lock) { - lock->take_async_break_before_final_list = true; -} diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 1409db24b9..d04eeed83a 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -52,19 +52,14 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Execute \a action within the lock. void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, - grpc_closure *closure, grpc_error *error); + grpc_closure *closure, grpc_error *error, + bool covered_by_poller); // Execute \a action within the lock just prior to unlocking. -// if \a hint_async_break is true, the combiner tries to hand execution to -// another thread before finishing the primary queue of combined closures and -// executing the finally list. -// Deprecation warning: \a hint_async_break will be removed in a future version -// Takes a very slow and round-about path if not called from a -// grpc_combiner_execute closure. void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, - bool hint_async_break); -// Deprecated: force the finally list execution onto another thread -void grpc_combiner_force_async_finally(grpc_combiner *lock); + bool covered_by_poller); + +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); extern int grpc_combiner_trace; diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 31c80260f8..f6bb3a0477 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) { return "http_status"; case GRPC_ERROR_INT_LIMIT: return "limit"; + case GRPC_ERROR_INT_OCCURRED_DURING_WRITE: + return "occurred_during_write"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) { return "tsi_error"; case GRPC_ERROR_STR_FILENAME: return "filename"; + case GRPC_ERROR_STR_QUEUED_BUFFERS: + return "queued_buffers"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -265,7 +269,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { } else { out = gpr_malloc(sizeof(*out)); #ifdef GRPC_ERROR_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "%p create copying", out); + gpr_log(GPR_DEBUG, "%p create copying %p", out, in); #endif out->ints = gpr_avl_ref(in->ints); out->strs = gpr_avl_ref(in->strs); @@ -523,21 +527,25 @@ static char *fmt_time(void *p) { return out; } -static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) { +static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap, + bool *first) { if (n == NULL) return; - add_errs(n->left, s, sz, cap); + add_errs(n->left, s, sz, cap, first); + if (!*first) append_chr(',', s, sz, cap); + *first = false; const char *e = grpc_error_string(n->value); append_str(e, s, sz, cap); grpc_error_free_string(e); - add_errs(n->right, s, sz, cap); + add_errs(n->right, s, sz, cap, first); } static char *errs_string(grpc_error *err) { char *s = NULL; size_t sz = 0; size_t cap = 0; + bool first = true; append_chr('[', &s, &sz, &cap); - add_errs(err->errs.root, &s, &sz, &cap); + add_errs(err->errs.root, &s, &sz, &cap, &first); append_chr(']', &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 00ace8a7a9..f3f3b80a09 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -100,6 +100,8 @@ typedef enum { GRPC_ERROR_INT_HTTP_STATUS, /// context sensitive limit associated with the error GRPC_ERROR_INT_LIMIT, + /// chttp2: did the error occur while a write was in progress + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, } grpc_error_ints; typedef enum { @@ -121,6 +123,8 @@ typedef enum { GRPC_ERROR_STR_TSI_ERROR, /// filename that we were trying to read/write when this error occurred GRPC_ERROR_STR_FILENAME, + /// which data was queued for writing when the error occurred + GRPC_ERROR_STR_QUEUED_BUFFERS } grpc_error_strs; typedef enum { @@ -128,9 +132,13 @@ typedef enum { GRPC_ERROR_TIME_CREATED, } grpc_error_times; +/// The following "special" errors can be propagated without allocating memory. +/// They are always even so that other code (particularly combiner locks) can +/// safely use the lower bit for themselves. + #define GRPC_ERROR_NONE ((grpc_error *)NULL) -#define GRPC_ERROR_OOM ((grpc_error *)1) -#define GRPC_ERROR_CANCELLED ((grpc_error *)2) +#define GRPC_ERROR_OOM ((grpc_error *)2) +#define GRPC_ERROR_CANCELLED ((grpc_error *)4) const char *grpc_error_string(grpc_error *error); void grpc_error_free_string(const char *str); diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index ad1cfc2031..e5909d9380 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -152,20 +152,20 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -//#define GRPC_PI_REF_COUNT_DEBUG -#ifdef GRPC_PI_REF_COUNT_DEBUG +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) #define PI_UNREF(exec_ctx, p, r) \ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */ +#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ +/* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { gpr_mu mu; /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement @@ -185,8 +185,17 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; - /* The workqueue associated with this polling island */ - grpc_workqueue *workqueue; + /* Number of threads currently polling on this island */ + gpr_atm poller_count; + /* Mutex guarding the read end of the workqueue (must be held to pop from + * workqueue_items) */ + gpr_mu workqueue_read_mu; + /* Queue of closures to be executed */ + gpr_mpscq workqueue_items; + /* Count of items in workqueue_items */ + gpr_atm workqueue_item_count; + /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ + grpc_wakeup_fd workqueue_wakeup_fd; /* The fd of the underlying epoll set */ int epoll_fd; @@ -275,6 +284,10 @@ static bool append_error(grpc_error **composite, grpc_error *error, threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ static grpc_wakeup_fd polling_island_wakeup_fd; +/* The polling island being polled right now. + See comments in workqueue_maybe_wakeup for why this is tracked. */ +static __thread polling_island *g_current_thread_polling_island; + /* Forward declaration */ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); @@ -289,12 +302,12 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -#ifdef GRPC_PI_REF_COUNT_DEBUG static void pi_add_ref(polling_island *pi); static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, - int line) { +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +static void pi_add_ref_dbg(polling_island *pi, const char *reason, + const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_add_ref(pi); gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)", @@ -302,12 +315,42 @@ static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, } static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, - char *reason, char *file, int line) { + const char *reason, const char *file, int line) { long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_unref(exec_ctx, pi); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } + +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, + const char *file, int line, + const char *reason) { + if (workqueue != NULL) { + pi_add_ref_dbg((polling_island *)workqueue, reason, file, line); + } + return workqueue; +} + +static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) { + if (workqueue != NULL) { + pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line); + } +} +#else +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { + if (workqueue != NULL) { + pi_add_ref((polling_island *)workqueue); + } + return workqueue; +} + +static void workqueue_unref(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) { + if (workqueue != NULL) { + pi_unref(exec_ctx, (polling_island *)workqueue); + } +} #endif static void pi_add_ref(polling_island *pi) { @@ -315,10 +358,7 @@ static void pi_add_ref(polling_island *pi) { } static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { - /* If ref count went to one, we're back to just the workqueue owning a ref. - Unref the workqueue to break the loop. - - If ref count went to zero, delete the polling island. + /* If ref count went to zero, delete the polling island. Note that this deletion not be done under a lock. Once the ref count goes to zero, we are guaranteed that no one else holds a reference to the polling island (and that there is no racing pi_add_ref() call either). @@ -326,20 +366,12 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { Also, if we are deleting the polling island and the merged_to field is non-empty, we should remove a ref to the merged_to polling island */ - switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) { - case 2: /* last external ref: the only one now owned is by the workqueue */ - GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); - break; - case 1: { - polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); - polling_island_delete(exec_ctx, pi); - if (next != NULL) { - PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ - } - break; + if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + polling_island_delete(exec_ctx, pi); + if (next != NULL) { + PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ } - case 0: - GPR_UNREACHABLE_CODE(return ); } } @@ -488,11 +520,20 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, pi->fd_capacity = 0; pi->fds = NULL; pi->epoll_fd = -1; - pi->workqueue = NULL; + + gpr_mu_init(&pi->workqueue_read_mu); + gpr_mpscq_init(&pi->workqueue_items); + gpr_atm_rel_store(&pi->workqueue_item_count, 0); gpr_atm_rel_store(&pi->ref_count, 0); + gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); + if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), + err_desc)) { + goto done; + } + pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { @@ -501,26 +542,14 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, } polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); if (initial_fd != NULL) { polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); } - if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), - err_desc) && - *error == GRPC_ERROR_NONE) { - polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, - error); - GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); - pi->workqueue->wakeup_read_fd->polling_island = pi; - PI_ADD_REF(pi, "fd"); - } - done: if (*error != GRPC_ERROR_NONE) { - if (pi->workqueue != NULL) { - GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); - } polling_island_delete(exec_ctx, pi); pi = NULL; } @@ -533,7 +562,11 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { if (pi->epoll_fd >= 0) { close(pi->epoll_fd); } + GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0); + gpr_mu_destroy(&pi->workqueue_read_mu); + gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); + grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); gpr_free(pi->fds); gpr_free(pi); } @@ -678,6 +711,45 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) { } } +static void workqueue_maybe_wakeup(polling_island *pi) { + /* If this thread is the current poller, then it may be that it's about to + decrement the current poller count, so we need to look past this thread */ + bool is_current_poller = (g_current_thread_polling_island == pi); + gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; + gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count); + /* Only issue a wakeup if it's likely that some poller could come in and take + it right now. Note that since we do an anticipatory mpscq_pop every poll + loop, it's ok if we miss the wakeup here, as we'll get the work item when + the next poller enters anyway. */ + if (current_pollers > min_current_pollers_for_wakeup) { + GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", + grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd)); + } +} + +static void workqueue_move_items_to_parent(polling_island *q) { + polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to); + if (p == NULL) { + return; + } + gpr_mu_lock(&q->workqueue_read_mu); + int num_added = 0; + while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) { + gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items); + if (n != NULL) { + gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1); + gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1); + gpr_mpscq_push(&p->workqueue_items, n); + num_added++; + } + } + gpr_mu_unlock(&q->workqueue_read_mu); + if (num_added > 0) { + workqueue_maybe_wakeup(p); + } + workqueue_move_items_to_parent(p); +} + static polling_island *polling_island_merge(polling_island *p, polling_island *q, grpc_error **error) { @@ -702,6 +774,8 @@ static polling_island *polling_island_merge(polling_island *p, /* Add the 'merged_to' link from p --> q */ gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ + + workqueue_move_items_to_parent(q); } /* else if p == q, nothing needs to be done */ @@ -712,6 +786,26 @@ static polling_island *polling_island_merge(polling_island *p, return q; } +static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue, grpc_closure *closure, + grpc_error *error) { + GPR_TIMER_BEGIN("workqueue.enqueue", 0); + /* take a ref to the workqueue: otherwise it can happen that whatever events + * this kicks off ends up destroying the workqueue before this function + * completes */ + GRPC_WORKQUEUE_REF(workqueue, "enqueue"); + polling_island *pi = (polling_island *)workqueue; + gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1); + closure->error_data.error = error; + gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next); + if (last == 0) { + workqueue_maybe_wakeup(pi); + } + workqueue_move_items_to_parent(pi); + GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); + GPR_TIMER_END("workqueue.enqueue", 0); +} + static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -1042,11 +1136,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { gpr_mu_lock(&fd->mu); - grpc_workqueue *workqueue = NULL; - if (fd->polling_island != NULL) { - workqueue = - GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue"); - } + grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF( + (grpc_workqueue *)fd->polling_island, "fd_get_workqueue"); gpr_mu_unlock(&fd->mu); return workqueue; } @@ -1299,7 +1390,29 @@ static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->polling_island == NULL); } -#define GRPC_EPOLL_MAX_EVENTS 1000 +static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, + polling_island *pi) { + if (gpr_mu_trylock(&pi->workqueue_read_mu)) { + gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items); + gpr_mu_unlock(&pi->workqueue_read_mu); + if (n != NULL) { + if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) { + workqueue_maybe_wakeup(pi); + } + grpc_closure *c = (grpc_closure *)n; + grpc_closure_run(exec_ctx, c, c->error_data.error); + return true; + } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { + /* n == NULL might mean there's work but it's not available to be popped + * yet - try to ensure another workqueue wakes up to check shortly if so + */ + workqueue_maybe_wakeup(pi); + } + } + return false; +} + +#define GRPC_EPOLL_MAX_EVENTS 100 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1354,7 +1467,13 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, PI_ADD_REF(pi, "ps_work"); gpr_mu_unlock(&pollset->mu); - do { + /* If we get some workqueue work to do, it might end up completing an item on + the completion queue, so there's no need to poll... so we skip that and + redo the complete loop to verify */ + if (!maybe_do_workqueue_work(exec_ctx, pi)) { + gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); + g_current_thread_polling_island = pi; + GRPC_SCHEDULING_START_BLOCKING_REGION; ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); @@ -1386,6 +1505,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, append_error(error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), err_desc); + } else if (data_ptr == &pi->workqueue_wakeup_fd) { + append_error(error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), + err_desc); + maybe_do_workqueue_work(exec_ctx, pi); } else if (data_ptr == &polling_island_wakeup_fd) { GRPC_POLLING_TRACE( "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " @@ -1408,7 +1532,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, } } } - } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + + g_current_thread_polling_island = NULL; + gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); + } GPR_ASSERT(pi != NULL); @@ -1868,6 +1995,10 @@ static const grpc_event_engine_vtable vtable = { .kick_poller = kick_poller, + .workqueue_ref = workqueue_ref, + .workqueue_unref = workqueue_unref, + .workqueue_enqueue = workqueue_enqueue, + .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index c2107e5e39..1829440a6e 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -1989,6 +1989,32 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, } /******************************************************************************* + * workqueue stubs + */ + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, + const char *file, int line, + const char *reason) { + return workqueue; +} +static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { + return workqueue; +} +static void workqueue_unref(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) {} +#endif + +static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue, grpc_closure *closure, + grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + +/******************************************************************************* * event engine binding */ @@ -2029,6 +2055,10 @@ static const grpc_event_engine_vtable vtable = { .kick_poller = kick_poller, + .workqueue_ref = workqueue_ref, + .workqueue_unref = workqueue_unref, + .workqueue_enqueue = workqueue_enqueue, + .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index f137e4dacc..351b069613 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1260,6 +1260,32 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, } /******************************************************************************* + * workqueue stubs + */ + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, + const char *file, int line, + const char *reason) { + return workqueue; +} +static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { + return workqueue; +} +static void workqueue_unref(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) {} +#endif + +static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue, grpc_closure *closure, + grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + +/******************************************************************************* * Condition Variable polling extensions */ @@ -1498,6 +1524,10 @@ static const grpc_event_engine_vtable vtable = { .kick_poller = kick_poller, + .workqueue_ref = workqueue_ref, + .workqueue_unref = workqueue_unref, + .workqueue_enqueue = workqueue_enqueue, + .shutdown_engine = shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 0637f80421..9857b0bce9 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -259,4 +259,27 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, + int line, const char *reason) { + return g_event_engine->workqueue_ref(workqueue, file, line, reason); +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) { + g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason); +} +#else +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { + return g_event_engine->workqueue_ref(workqueue); +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + g_event_engine->workqueue_unref(exec_ctx, workqueue); +} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error); +} + #endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index c2aa1756ea..2fdef06838 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -40,6 +40,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/workqueue.h" typedef struct grpc_fd grpc_fd; @@ -95,6 +96,18 @@ typedef struct grpc_event_engine_vtable { grpc_error *(*kick_poller)(void); void (*shutdown_engine)(void); + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file, + int line, const char *reason); + void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason); +#else + grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue); + void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); +#endif + void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); } grpc_event_engine_vtable; void grpc_event_engine_init(void); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index ac7785ec13..604713e578 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -60,18 +61,43 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); - while (!grpc_closure_list_empty(exec_ctx->closure_list)) { - grpc_closure *c = exec_ctx->closure_list.head; - exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; - while (c != NULL) { - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error; - did_something = true; - GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); + for (;;) { + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + grpc_closure *c = exec_ctx->closure_list.head; + exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; + while (c != NULL) { + grpc_closure *next = c->next_data.next; + did_something = true; + grpc_closure_run(exec_ctx, c, c->error_data.error); + c = next; + } + } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) { + break; + } + } + GPR_ASSERT(exec_ctx->active_combiner == NULL); + if (exec_ctx->stealing_from_workqueue != NULL) { + if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { + grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue, + exec_ctx->stolen_closure, + exec_ctx->stolen_closure->error_data.error); + GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue, + "exec_ctx_sched"); + exec_ctx->stealing_from_workqueue = NULL; + exec_ctx->stolen_closure = NULL; + } else { + grpc_closure *c = exec_ctx->stolen_closure; + GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue, + "exec_ctx_sched"); + exec_ctx->stealing_from_workqueue = NULL; + exec_ctx->stolen_closure = NULL; + grpc_error *error = c->error_data.error; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0); c->cb(exec_ctx, c->cb_arg, error); GRPC_ERROR_UNREF(error); - GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); - c = next; + GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0); + grpc_exec_ctx_flush(exec_ctx); + did_something = true; } } GPR_TIMER_END("grpc_exec_ctx_flush", 0); @@ -86,12 +112,25 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { + GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0); if (offload_target_or_null == NULL) { grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - } else { + } else if (exec_ctx->stealing_from_workqueue == NULL) { + exec_ctx->stealing_from_workqueue = offload_target_or_null; + closure->error_data.error = error; + exec_ctx->stolen_closure = closure; + } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) { grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); + } else { /* stealing_from_workqueue == offload_target_or_null */ + grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, + exec_ctx->stolen_closure, + exec_ctx->stolen_closure->error_data.error); + closure->error_data.error = error; + exec_ctx->stolen_closure = closure; + GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); } + GPR_TIMER_END("grpc_exec_ctx_sched", 0); } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 4d20ecf922..7e50cb9825 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -66,15 +66,33 @@ typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER struct grpc_exec_ctx { grpc_closure_list closure_list; + /** The workqueue we're stealing work from. + As items are queued to the execution context, we try to steal one + workqueue item and execute it inline (assuming the exec_ctx is not + finished) - doing so does not invalidate the workqueue's contract, and + provides a small latency win in cases where we get a hit */ + grpc_workqueue *stealing_from_workqueue; + /** The workqueue item that was stolen from the workqueue above. When new + items are scheduled to be offloaded to that workqueue, we need to update + this like a 1-deep fifo to maintain the invariant that workqueue items + queued by one thread are started in order */ + grpc_closure *stolen_closure; /** currently active combiner: updated only via combiner.c */ grpc_combiner *active_combiner; + /** last active combiner in the active combiner list */ + grpc_combiner *last_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; +/* initializer for grpc_exec_ctx: + prefer to use GRPC_EXEC_CTX_INIT whenever possible */ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check } + { \ + GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \ + finish_check \ + } #else struct grpc_exec_ctx { bool cached_ready_to_finish; @@ -85,8 +103,10 @@ struct grpc_exec_ctx { { false, finish_check_arg, finish_check } #endif +/* initialize an execution context at the top level of an API call into grpc + (this is safe to use elsewhere, though possibly not as efficient) */ #define GRPC_EXEC_CTX_INIT \ - GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL) + GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL) /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index d67d388b8c..4fd83e0b22 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -112,6 +112,14 @@ void grpc_iomgr_shutdown(void) { continue; } if (g_root_object.next != &g_root_object) { + if (grpc_iomgr_abort_on_leaks()) { + gpr_log(GPR_DEBUG, "Failed to free %" PRIuPTR + " iomgr objects before shutdown deadline: " + "memory leaks are likely", + count_objects()); + dump_objects("LEAKED"); + abort(); + } gpr_timespec short_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) { @@ -122,9 +130,6 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); - if (grpc_iomgr_abort_on_leaks()) { - abort(); - } } break; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 92767721d5..00fd77679a 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -177,7 +177,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + grpc_closure_run(exec_ctx, cb, error); } #define MAX_READ_IOVEC 4 @@ -209,11 +209,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GPR_TIMER_BEGIN("recvmsg", 1); + GPR_TIMER_BEGIN("recvmsg", 0); do { read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); - GPR_TIMER_END("recvmsg", 0); + GPR_TIMER_END("recvmsg", read_bytes >= 0); if (read_bytes < 0) { /* NB: After calling call_read_cb a parallel call of the read handler may @@ -392,11 +392,8 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error_free_string(str); } - GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); - cb->cb(exec_ctx, cb->cb_arg, error); - GPR_TIMER_END("tcp_handle_write.cb", 0); + grpc_closure_run(exec_ctx, cb, error); TCP_UNREF(exec_ctx, tcp, "write"); - GRPC_ERROR_UNREF(error); } } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index b2805dc66c..5b96d1d851 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -40,10 +40,6 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" -#ifdef GPR_POSIX_SOCKET -#include "src/core/lib/iomgr/workqueue_posix.h" -#endif - #ifdef GPR_WINDOWS #include "src/core/lib/iomgr/workqueue_windows.h" #endif @@ -58,20 +54,20 @@ string will be printed alongside the refcount. When it is not defined, the string will be discarded at compilation time. */ -//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG +/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/ #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ - (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) + grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) #define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) -void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, - const char *reason); +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, + int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason); #else -#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p)) +#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) #define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) -void grpc_workqueue_ref(grpc_workqueue *workqueue); +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c deleted file mode 100644 index ecfea68f56..0000000000 --- a/src/core/lib/iomgr/workqueue_posix.c +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_SOCKET - -#include "src/core/lib/iomgr/workqueue.h" - -#include <stdio.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/profiling/timers.h" - -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); - -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue) { - char name[32]; - *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&(*workqueue)->refs, 1); - gpr_atm_no_barrier_store(&(*workqueue)->state, 1); - grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); - if (err != GRPC_ERROR_NONE) { - gpr_free(*workqueue); - return err; - } - sprintf(name, "workqueue:%p", (void *)(*workqueue)); - (*workqueue)->wakeup_read_fd = grpc_fd_create( - GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); - gpr_mpscq_init(&(*workqueue)->queue); - grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); - grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, - &(*workqueue)->read_closure); - return GRPC_ERROR_NONE; -} - -static void workqueue_destroy(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); -} - -static void workqueue_orphan(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) { - workqueue_destroy(exec_ctx, workqueue); - } -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, - const char *reason) { - if (workqueue == NULL) return; - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", - workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, - reason); - gpr_ref(&workqueue->refs); -} -#else -void grpc_workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue == NULL) return; - gpr_ref(&workqueue->refs); -} -#endif - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue == NULL) return; - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", - workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, - reason); - if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); - } -} -#else -void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - if (workqueue == NULL) return; - if (gpr_unref(&workqueue->refs)) { - workqueue_orphan(exec_ctx, workqueue); - } -} -#endif - -static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - abort(); -} - -static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_TIMER_MARK("workqueue.wakeup", 0); - grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) { - drain(exec_ctx, workqueue); - } -} - -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.on_readable", 0); - - grpc_workqueue *workqueue = arg; - - if (error != GRPC_ERROR_NONE) { - /* HACK: let wakeup_fd code know that we stole the fd */ - workqueue->wakeup_fd.read_fd = 0; - grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); - grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); - GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); - gpr_free(workqueue); - } else { - error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); - if (error == GRPC_ERROR_NONE) { - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - } else { - /* recurse to get error handling */ - on_readable(exec_ctx, arg, error); - } - if (n == NULL) { - /* try again - queue in an inconsistant state */ - wakeup(exec_ctx, workqueue); - } else { - switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) { - case 3: // had one count, one unorphaned --> done, unorphaned - break; - case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; - case 1: - case 0: - // these values are illegal - representing an already done or - // deleted workqueue - GPR_UNREACHABLE_CODE(break); - default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); - } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error; - cl->cb(exec_ctx, cl->cb_arg, clerr); - GRPC_ERROR_UNREF(clerr); - } - } - - GPR_TIMER_END("workqueue.on_readable", 0); -} - -void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); - GPR_ASSERT(last & 1); - closure->error = error; - gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); - if (last == 1) { - wakeup(exec_ctx, workqueue); - } - GPR_TIMER_END("workqueue.enqueue", 0); -} - -#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h deleted file mode 100644 index 03ee21cef7..0000000000 --- a/src/core/lib/iomgr/workqueue_posix.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H -#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H - -#include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/support/mpscq.h" - -struct grpc_fd; - -struct grpc_workqueue { - gpr_refcount refs; - gpr_mpscq queue; - // state is: - // lower bit - zero if orphaned - // other bits - number of items enqueued - gpr_atm state; - - grpc_wakeup_fd wakeup_fd; - struct grpc_fd *wakeup_read_fd; - - grpc_closure read_closure; -}; - -/** Create a work queue. Returns an error if creation fails. If creation - succeeds, sets *workqueue to point to it. */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - -#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index ee81dc248e..5c93d3c59e 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -43,12 +43,16 @@ // workqueues. #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, - const char *reason) {} +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, + int line, const char *reason) { + return workqueue; +} void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason) {} #else -void grpc_workqueue_ref(grpc_workqueue *workqueue) {} +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { + return workqueue; +} void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif |