diff options
Diffstat (limited to 'src/core/lib/iomgr/resource_quota.cc')
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc | 74 |
1 files changed, 64 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index b6fc7579f7..7e4b3c9b2f 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -90,7 +90,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; - /* How many bytes of allocations are outstanding */ + /* The amount of memory (in bytes) that has been requested from this user + * asynchronously but hasn't been granted yet. */ int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ @@ -135,6 +136,9 @@ struct grpc_resource_quota { int64_t size; /* Amount of free memory in the resource quota */ int64_t free_pool; + /* Used size of memory in the resource quota. Updated as soon as the resource + * users start to allocate or free the memory. */ + gpr_atm used; gpr_atm last_size; @@ -371,6 +375,7 @@ static bool rq_reclaim_from_per_user_free_pool( while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); + resource_user->added_to_free_pool = false; if (resource_user->free_pool > 0) { int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; @@ -386,6 +391,13 @@ static bool rq_reclaim_from_per_user_free_pool( gpr_mu_unlock(&resource_user->mu); return true; } else { + if (grpc_resource_quota_trace.enabled()) { + gpr_log(GPR_INFO, + "RQ %s %s: failed to reclaim_from_per_user_free_pool; " + "free_pool = %" PRId64 "; rq_free_pool = %" PRId64, + resource_quota->name, resource_user->name, + resource_user->free_pool, resource_quota->free_pool); + } gpr_mu_unlock(&resource_user->mu); } } @@ -622,6 +634,7 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) { resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + resource_quota->used = 0; gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); gpr_mu_init(&resource_quota->thread_count_mu); resource_quota->max_threads = INT_MAX; @@ -712,7 +725,7 @@ size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { */ grpc_resource_quota* grpc_resource_quota_from_channel_args( - const grpc_channel_args* channel_args) { + const grpc_channel_args* channel_args, bool create) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { if (channel_args->args[i].type == GRPC_ARG_POINTER) { @@ -724,7 +737,7 @@ grpc_resource_quota* grpc_resource_quota_from_channel_args( } } } - return grpc_resource_quota_create(nullptr); + return create ? grpc_resource_quota_create(nullptr) : nullptr; } static void* rq_copy(void* rq) { @@ -863,33 +876,68 @@ void grpc_resource_user_free_threads(grpc_resource_user* resource_user, gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu); } -void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, - grpc_closure* optional_on_done) { - gpr_mu_lock(&resource_user->mu); +static void resource_user_alloc_locked(grpc_resource_user* resource_user, + size_t size, + grpc_closure* optional_on_done) { ru_ref_by(resource_user, static_cast<gpr_atm>(size)); resource_user->free_pool -= static_cast<int64_t>(size); - resource_user->outstanding_allocations += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->free_pool); } if (resource_user->free_pool < 0) { - grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, - GRPC_ERROR_NONE); + if (optional_on_done != nullptr) { + resource_user->outstanding_allocations += static_cast<int64_t>(size); + grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, + GRPC_ERROR_NONE); + } if (!resource_user->allocating) { resource_user->allocating = true; GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE); } } else { - resource_user->outstanding_allocations -= static_cast<int64_t>(size); GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE); } +} + +bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user, + size_t size) { + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false; + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + bool cas_success; + do { + gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used); + gpr_atm new_used = used + size; + if (static_cast<size_t>(new_used) > + grpc_resource_quota_peek_size(resource_quota)) { + gpr_mu_unlock(&resource_user->mu); + return false; + } + cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used); + } while (!cas_success); + resource_user_alloc_locked(resource_user, size, nullptr); + gpr_mu_unlock(&resource_user->mu); + return true; +} + +void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size, + grpc_closure* optional_on_done) { + // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this + // because some tests become flaky after the change. + gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm_no_barrier_fetch_add(&resource_quota->used, size); + resource_user_alloc_locked(resource_user, size, optional_on_done); gpr_mu_unlock(&resource_user->mu); } void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); + grpc_resource_quota* resource_quota = resource_user->resource_quota; + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size); + GPR_ASSERT(prior >= static_cast<long>(size)); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += static_cast<int64_t>(size); if (grpc_resource_quota_trace.enabled()) { @@ -940,6 +988,12 @@ void grpc_resource_user_slice_allocator_init( void grpc_resource_user_alloc_slices( grpc_resource_user_slice_allocator* slice_allocator, size_t length, size_t count, grpc_slice_buffer* dest) { + if (gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown)) { + GRPC_CLOSURE_SCHED( + &slice_allocator->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + return; + } slice_allocator->length = length; slice_allocator->count = count; slice_allocator->dest = dest; |