diff options
Diffstat (limited to 'src/core/lib/iomgr/resource_quota.cc')
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc | 182 |
1 files changed, 75 insertions, 107 deletions
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index ccd8d9f379..55d559c466 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -154,8 +154,7 @@ struct grpc_resource_quota { char* name; }; -static void ru_unref_by(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, gpr_atm amount); +static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount); /******************************************************************************* * list management @@ -239,35 +238,31 @@ static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) { * resource quota state machine */ -static bool rq_alloc(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota); +static bool rq_alloc(grpc_resource_quota* resource_quota); static bool rq_reclaim_from_per_user_free_pool( - grpc_exec_ctx* exec_ctx, grpc_resource_quota* resource_quota); -static bool rq_reclaim(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota, bool destructive); + grpc_resource_quota* resource_quota); +static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive); -static void rq_step(grpc_exec_ctx* exec_ctx, void* rq, grpc_error* error) { +static void rq_step(void* rq, grpc_error* error) { grpc_resource_quota* resource_quota = (grpc_resource_quota*)rq; resource_quota->step_scheduled = false; do { - if (rq_alloc(exec_ctx, resource_quota)) goto done; - } while (rq_reclaim_from_per_user_free_pool(exec_ctx, resource_quota)); + if (rq_alloc(resource_quota)) goto done; + } while (rq_reclaim_from_per_user_free_pool(resource_quota)); - if (!rq_reclaim(exec_ctx, resource_quota, false)) { - rq_reclaim(exec_ctx, resource_quota, true); + if (!rq_reclaim(resource_quota, false)) { + rq_reclaim(resource_quota, true); } done: - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + grpc_resource_quota_unref_internal(resource_quota); } -static void rq_step_sched(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota) { +static void rq_step_sched(grpc_resource_quota* resource_quota) { if (resource_quota->step_scheduled) return; resource_quota->step_scheduled = true; grpc_resource_quota_ref_internal(resource_quota); - GRPC_CLOSURE_SCHED(exec_ctx, &resource_quota->rq_step_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE); } /* update the atomically available resource estimate - use no barriers since @@ -286,8 +281,7 @@ static void rq_update_estimate(grpc_resource_quota* resource_quota) { } /* returns true if all allocations are completed */ -static bool rq_alloc(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota) { +static bool rq_alloc(grpc_resource_quota* resource_quota) { grpc_resource_user* resource_user; while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { @@ -307,9 +301,9 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx, int64_t aborted_allocations = resource_user->outstanding_allocations; resource_user->outstanding_allocations = 0; resource_user->free_pool += aborted_allocations; - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); + GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); - ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations); + ru_unref_by(resource_user, (gpr_atm)aborted_allocations); continue; } if (resource_user->free_pool < 0 && @@ -333,7 +327,7 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx, if (resource_user->free_pool >= 0) { resource_user->allocating = false; resource_user->outstanding_allocations = 0; - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); + GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); @@ -346,7 +340,7 @@ static bool rq_alloc(grpc_exec_ctx* exec_ctx, /* returns true if any memory could be reclaimed from buffers */ static bool rq_reclaim_from_per_user_free_pool( - grpc_exec_ctx* exec_ctx, grpc_resource_quota* resource_quota) { + grpc_resource_quota* resource_quota) { grpc_resource_user* resource_user; while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { @@ -373,8 +367,7 @@ static bool rq_reclaim_from_per_user_free_pool( } /* returns true if reclamation is proceeding */ -static bool rq_reclaim(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota, bool destructive) { +static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { if (resource_quota->reclaiming) return true; grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE : GRPC_RULIST_RECLAIMER_BENIGN; @@ -392,7 +385,7 @@ static bool rq_reclaim(grpc_exec_ctx* exec_ctx, resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); return true; } @@ -412,10 +405,10 @@ static void ru_slice_ref(void* p) { gpr_ref(&rc->refs); } -static void ru_slice_unref(grpc_exec_ctx* exec_ctx, void* p) { +static void ru_slice_unref(void* p) { ru_slice_refcount* rc = (ru_slice_refcount*)p; if (gpr_unref(&rc->refs)) { - grpc_resource_user_free(exec_ctx, rc->resource_user, rc->size); + grpc_resource_user_free(rc->resource_user, rc->size); gpr_free(rc); } } @@ -445,61 +438,57 @@ static grpc_slice ru_slice_create(grpc_resource_user* resource_user, * the combiner */ -static void ru_allocate(grpc_exec_ctx* exec_ctx, void* ru, grpc_error* error) { +static void ru_allocate(void* ru, grpc_error* error) { grpc_resource_user* resource_user = (grpc_resource_user*)ru; if (rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION)) { - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); } -static void ru_add_to_free_pool(grpc_exec_ctx* exec_ctx, void* ru, - grpc_error* error) { +static void ru_add_to_free_pool(void* ru, grpc_error* error) { grpc_resource_user* resource_user = (grpc_resource_user*)ru; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL)) { - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } -static bool ru_post_reclaimer(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, +static bool ru_post_reclaimer(grpc_resource_user* resource_user, bool destructive) { grpc_closure* closure = resource_user->new_reclaimers[destructive]; GPR_ASSERT(closure != nullptr); resource_user->new_reclaimers[destructive] = nullptr; GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr); if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED); return false; } resource_user->reclaimers[destructive] = closure; return true; } -static void ru_post_benign_reclaimer(grpc_exec_ctx* exec_ctx, void* ru, - grpc_error* error) { +static void ru_post_benign_reclaimer(void* ru, grpc_error* error) { grpc_resource_user* resource_user = (grpc_resource_user*)ru; - if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return; + if (!ru_post_reclaimer(resource_user, false)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL) && rulist_empty(resource_user->resource_quota, GRPC_RULIST_RECLAIMER_BENIGN)) { - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); } -static void ru_post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, void* ru, - grpc_error* error) { +static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) { grpc_resource_user* resource_user = (grpc_resource_user*)ru; - if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return; + if (!ru_post_reclaimer(resource_user, true)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -508,51 +497,46 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx* exec_ctx, void* ru, GRPC_RULIST_RECLAIMER_BENIGN) && rulist_empty(resource_user->resource_quota, GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) { - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } -static void ru_shutdown(grpc_exec_ctx* exec_ctx, void* ru, grpc_error* error) { +static void ru_shutdown(void* ru, grpc_error* error) { if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RU shutdown %p", ru); } grpc_resource_user* resource_user = (grpc_resource_user*)ru; - GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); resource_user->reclaimers[0] = nullptr; resource_user->reclaimers[1] = nullptr; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); if (resource_user->allocating) { - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } } -static void ru_destroy(grpc_exec_ctx* exec_ctx, void* ru, grpc_error* error) { +static void ru_destroy(void* ru, grpc_error* error) { grpc_resource_user* resource_user = (grpc_resource_user*)ru; GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } - GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; - rq_step_sched(exec_ctx, resource_user->resource_quota); + rq_step_sched(resource_user->resource_quota); } - grpc_resource_quota_unref_internal(exec_ctx, resource_user->resource_quota); + grpc_resource_quota_unref_internal(resource_user->resource_quota); gpr_mu_destroy(&resource_user->mu); gpr_free(resource_user->name); gpr_free(resource_user); } -static void ru_allocated_slices(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void ru_allocated_slices(void* arg, grpc_error* error) { grpc_resource_user_slice_allocator* slice_allocator = (grpc_resource_user_slice_allocator*)arg; if (error == GRPC_ERROR_NONE) { @@ -562,7 +546,7 @@ static void ru_allocated_slices(grpc_exec_ctx* exec_ctx, void* arg, slice_allocator->length)); } } - GRPC_CLOSURE_RUN(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error)); } /******************************************************************************* @@ -576,23 +560,22 @@ typedef struct { grpc_closure closure; } rq_resize_args; -static void rq_resize(grpc_exec_ctx* exec_ctx, void* args, grpc_error* error) { +static void rq_resize(void* args, grpc_error* error) { rq_resize_args* a = (rq_resize_args*)args; int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; rq_update_estimate(a->resource_quota); - rq_step_sched(exec_ctx, a->resource_quota); - grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota); + rq_step_sched(a->resource_quota); + grpc_resource_quota_unref_internal(a->resource_quota); gpr_free(a); } -static void rq_reclamation_done(grpc_exec_ctx* exec_ctx, void* rq, - grpc_error* error) { +static void rq_reclamation_done(void* rq, grpc_error* error) { grpc_resource_quota* resource_quota = (grpc_resource_quota*)rq; resource_quota->reclaiming = false; - rq_step_sched(exec_ctx, resource_quota); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + rq_step_sched(resource_quota); + grpc_resource_quota_unref_internal(resource_quota); } /******************************************************************************* @@ -628,10 +611,9 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) { return resource_quota; } -void grpc_resource_quota_unref_internal(grpc_exec_ctx* exec_ctx, - grpc_resource_quota* resource_quota) { +void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) { if (gpr_unref(&resource_quota->refs)) { - GRPC_COMBINER_UNREF(exec_ctx, resource_quota->combiner, "resource_quota"); + GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota"); gpr_free(resource_quota->name); gpr_free(resource_quota); } @@ -639,9 +621,8 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx* exec_ctx, /* Public API */ void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); - grpc_exec_ctx_finish(&exec_ctx); + grpc_core::ExecCtx _local_exec_ctx; + grpc_resource_quota_unref_internal(resource_quota); } grpc_resource_quota* grpc_resource_quota_ref_internal( @@ -665,15 +646,14 @@ double grpc_resource_quota_get_memory_pressure( /* Public API */ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota, size_t size) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx _local_exec_ctx; rq_resize_args* a = (rq_resize_args*)gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; gpr_atm_no_barrier_store(&resource_quota->last_size, (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&exec_ctx, &a->closure, GRPC_ERROR_NONE); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE); } size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { @@ -704,8 +684,8 @@ static void* rq_copy(void* rq) { return rq; } -static void rq_destroy(grpc_exec_ctx* exec_ctx, void* rq) { - grpc_resource_quota_unref_internal(exec_ctx, (grpc_resource_quota*)rq); +static void rq_destroy(void* rq) { + grpc_resource_quota_unref_internal((grpc_resource_quota*)rq); } static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); } @@ -773,14 +753,12 @@ static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) { GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0); } -static void ru_unref_by(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, gpr_atm amount) { +static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) { GPR_ASSERT(amount > 0); gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); GPR_ASSERT(old >= amount); if (old == amount) { - GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->destroy_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE); } } @@ -788,16 +766,13 @@ void grpc_resource_user_ref(grpc_resource_user* resource_user) { ru_ref_by(resource_user, 1); } -void grpc_resource_user_unref(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { - ru_unref_by(exec_ctx, resource_user, 1); +void grpc_resource_user_unref(grpc_resource_user* resource_user) { + ru_unref_by(resource_user, 1); } -void grpc_resource_user_shutdown(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { +void grpc_resource_user_shutdown(grpc_resource_user* resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE( ru_shutdown, resource_user, grpc_combiner_scheduler(resource_user->resource_quota->combiner)), @@ -805,8 +780,7 @@ void grpc_resource_user_shutdown(grpc_exec_ctx* exec_ctx, } } -void grpc_resource_user_alloc(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, size_t size, +void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, grpc_closure* optional_on_done) { gpr_mu_lock(&resource_user->mu); ru_ref_by(resource_user, (gpr_atm)size); @@ -822,18 +796,16 @@ void grpc_resource_user_alloc(grpc_exec_ctx* exec_ctx, GRPC_ERROR_NONE); if (!resource_user->allocating) { resource_user->allocating = true; - GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->allocate_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE); } } else { resource_user->outstanding_allocations -= (int64_t)size; - GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); } -void grpc_resource_user_free(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, size_t size) { +void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += (int64_t)size; @@ -846,32 +818,29 @@ void grpc_resource_user_free(grpc_exec_ctx* exec_ctx, if (is_bigger_than_zero && was_zero_or_negative && !resource_user->added_to_free_pool) { resource_user->added_to_free_pool = true; - GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->add_to_free_pool_closure, + GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); - ru_unref_by(exec_ctx, resource_user, (gpr_atm)size); + ru_unref_by(resource_user, (gpr_atm)size); } -void grpc_resource_user_post_reclaimer(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user, +void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user, bool destructive, grpc_closure* closure) { GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr); resource_user->new_reclaimers[destructive] = closure; - GRPC_CLOSURE_SCHED(exec_ctx, - &resource_user->post_reclaimer_closure[destructive], + GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE); } -void grpc_resource_user_finish_reclamation(grpc_exec_ctx* exec_ctx, - grpc_resource_user* resource_user) { +void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) { if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } GRPC_CLOSURE_SCHED( - exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure, + &resource_user->resource_quota->rq_reclamation_done_closure, GRPC_ERROR_NONE); } @@ -886,12 +855,11 @@ void grpc_resource_user_slice_allocator_init( } void grpc_resource_user_alloc_slices( - grpc_exec_ctx* exec_ctx, grpc_resource_user_slice_allocator* slice_allocator, size_t length, size_t count, grpc_slice_buffer* dest) { slice_allocator->length = length; slice_allocator->count = count; slice_allocator->dest = dest; - grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, - count * length, &slice_allocator->on_allocated); + grpc_resource_user_alloc(slice_allocator->resource_user, count * length, + &slice_allocator->on_allocated); } |