/* * * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "src/core/lib/iomgr/combiner.h" #include #include #include #include #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); #define GRPC_COMBINER_TRACE(fn) \ do { \ if (GRPC_TRACER_ON(grpc_combiner_trace)) { \ fn; \ } \ } while (0) #define STATE_UNORPHANED 1 #define STATE_ELEM_COUNT_LOW_BIT 2 struct grpc_combiner { grpc_combiner *next_combiner_on_this_exec_ctx; grpc_workqueue *optional_workqueue; grpc_closure_scheduler uncovered_scheduler; grpc_closure_scheduler covered_scheduler; grpc_closure_scheduler uncovered_finally_scheduler; grpc_closure_scheduler covered_finally_scheduler; gpr_mpscq queue; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) gpr_atm state; // number of elements in the list that are covered by a poller: if >0, we can // offload safely gpr_atm elements_covered_by_poller; bool time_to_execute_final_list; bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; gpr_refcount refs; }; static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); static const grpc_closure_scheduler_vtable scheduler_uncovered = { combiner_exec_uncovered, combiner_exec_uncovered, "combiner:immediately:uncovered"}; static const grpc_closure_scheduler_vtable scheduler_covered = { combiner_exec_covered, combiner_exec_covered, "combiner:immediately:covered"}; static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = { combiner_finally_exec_uncovered, combiner_finally_exec_uncovered, "combiner:finally:uncovered"}; static const grpc_closure_scheduler_vtable finally_scheduler_covered = { combiner_finally_exec_covered, combiner_finally_exec_covered, "combiner:finally:covered"}; static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); typedef struct { grpc_error *error; bool covered_by_poller; } error_data; static uintptr_t pack_error_data(error_data d) { return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0); } static error_data unpack_error_data(uintptr_t p) { return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1}; } static bool is_covered_by_poller(grpc_combiner *lock) { return lock->final_list_covered_by_poller || gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; } #define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d" #define IS_COVERED_BY_POLLER_ARGS(lock) \ (lock)->final_list_covered_by_poller, \ gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \ is_covered_by_poller((lock)) grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; lock->final_list_covered_by_poller = false; lock->uncovered_scheduler.vtable = &scheduler_uncovered; lock->covered_scheduler.vtable = &scheduler_covered; lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered; lock->covered_finally_scheduler.vtable = &finally_scheduler_covered; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); grpc_closure_init(&lock->offload, offload, lock, grpc_workqueue_scheduler(lock->optional_workqueue)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); gpr_free(lock); } static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); if (old_state == 1) { really_destroy(exec_ctx, lock); } } #ifdef GRPC_COMBINER_REFCOUNT_DEBUG #define GRPC_COMBINER_DEBUG_SPAM(op, delta) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \ "combiner[%p] %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \ gpr_atm_no_barrier_load(&lock->refs.count), \ gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason); #else #define GRPC_COMBINER_DEBUG_SPAM(op, delta) #endif void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { start_destroy(exec_ctx, lock); } } grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM(" REF", 1); gpr_ref(&lock->refs); return lock; } static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; if (exec_ctx->active_combiner == NULL) { exec_ctx->active_combiner = exec_ctx->last_combiner = lock; } else { exec_ctx->last_combiner->next_combiner_on_this_exec_ctx = lock; exec_ctx->last_combiner = lock; } } static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; exec_ctx->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == NULL) { exec_ctx->last_combiner = lock; } } static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *cl, grpc_error *error, bool covered_by_poller) { GPR_TIMER_BEGIN("combiner.execute", 0); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock, cl, covered_by_poller, last)); GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed assert(cl->cb); cl->error_data.scratch = pack_error_data((error_data){error, covered_by_poller}); if (covered_by_poller) { gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1); } gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); } GPR_TIMER_END("combiner.execute", 0); } #define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ offsetof(grpc_combiner, scheduler_name))) static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, grpc_error *error) { combiner_exec(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl, error, false); } static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, grpc_error *error) { combiner_exec(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl, error, true); } static void move_next(grpc_exec_ctx *exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; if (exec_ctx->active_combiner == NULL) { exec_ctx->last_combiner = NULL; } } static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_combiner *lock = arg; push_last_on_exec_ctx(exec_ctx, lock); } static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock, lock->optional_workqueue)); grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); grpc_combiner *lock = exec_ctx->active_combiner; if (lock == NULL) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; } GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p grpc_combiner_continue_exec_ctx workqueue=%p " "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT " exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock), grpc_exec_ctx_ready_to_finish(exec_ctx), lock->time_to_execute_final_list)); if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be // picked up on the workqueue queue_offload(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } if (!lock->time_to_execute_final_list || // peek to see if something new has shown up, and execute that with // priority (gpr_atm_acq_load(&lock->state) >> 1) > 1) { gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); if (n == NULL) { // queue is in an inconsistent state: use this as a cue that we should // go off and do something else for a while (and come back later) GPR_TIMER_MARK("delay_busy", 0); if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) { queue_offload(exec_ctx, lock); } GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; error_data err = unpack_error_data(cl->error_data.scratch); #ifndef NDEBUG cl->scheduled = false; #endif cl->cb(exec_ctx, cl->cb_arg, err.error); if (err.covered_by_poller) { gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1); } GRPC_ERROR_UNREF(err.error); GPR_TIMER_END("combiner.exec1", 0); } else { grpc_closure *c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); lock->final_list_covered_by_poller = false; int loops = 0; while (c != NULL) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; #ifndef NDEBUG c->scheduled = false; #endif c->cb(exec_ctx, c->cb_arg, error); GRPC_ERROR_UNREF(error); c = next; GPR_TIMER_END("combiner.exec_1final", 0); } } GPR_TIMER_MARK("unref", 0); move_next(exec_ctx); lock->time_to_execute_final_list = false; gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); // Define a macro to ease readability of the following switch statement. #define OLD_STATE_WAS(orphaned, elem_count) \ (((orphaned) ? 0 : STATE_UNORPHANED) | \ ((elem_count)*STATE_ELEM_COUNT_LOW_BIT)) // Depending on what the previous state was, we need to perform different // actions. switch (old_state) { default: // we have multiple queued work items: just continue executing them break; case OLD_STATE_WAS(false, 2): case OLD_STATE_WAS(true, 2): // we're down to one queued item: if it's the final list we should do that if (!grpc_closure_list_empty(lock->final_list)) { lock->time_to_execute_final_list = true; } break; case OLD_STATE_WAS(false, 1): // had one count, one unorphaned --> unlocked unorphaned GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; case OLD_STATE_WAS(true, 1): // and one count, one orphaned --> unlocked and orphaned really_destroy(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; case OLD_STATE_WAS(false, 0): case OLD_STATE_WAS(true, 0): // these values are illegal - representing an already unlocked or // deleted lock GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_UNREACHABLE_CODE(return true); } push_first_on_exec_ctx(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error); static void combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, bool covered_by_poller) { GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock, closure, exec_ctx->active_combiner, covered_by_poller)); GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); grpc_closure_sched( exec_ctx, grpc_closure_create(enqueue_finally, closure, grpc_combiner_scheduler(lock, false)), error); GPR_TIMER_END("combiner.execute_finally", 0); return; } if (grpc_closure_list_empty(lock->final_list)) { gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); } if (covered_by_poller) { lock->final_list_covered_by_poller = true; } grpc_closure_list_append(&lock->final_list, closure, error); GPR_TIMER_END("combiner.execute_finally", 0); } static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, grpc_error *error) { combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, GRPC_ERROR_REF(error), false); } static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, grpc_error *error) { combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER( cl, uncovered_finally_scheduler), cl, error, false); } static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl, grpc_error *error) { combiner_execute_finally( exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler), cl, error, true); } grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner, bool covered_by_poller) { return covered_by_poller ? &combiner->covered_scheduler : &combiner->uncovered_scheduler; } grpc_closure_scheduler *grpc_combiner_finally_scheduler( grpc_combiner *combiner, bool covered_by_poller) { return covered_by_poller ? &combiner->covered_finally_scheduler : &combiner->uncovered_finally_scheduler; }