aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/resource_quota.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/resource_quota.cc')
-rw-r--r--src/core/lib/iomgr/resource_quota.cc34
1 files changed, 34 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 60262435b3..ecb5747da8 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -89,6 +89,8 @@ struct grpc_resource_user {
grpc_closure_list on_allocated;
/* True if we are currently trying to allocate from the quota, false if not */
bool allocating;
+ /* How many bytes of allocations are outstanding */
+ int64_t outstanding_allocations;
/* True if we are currently trying to add ourselves to the non-free quota
list, false otherwise */
bool added_to_free_pool;
@@ -153,6 +155,9 @@ struct grpc_resource_quota {
char *name;
};
+static void ru_unref_by(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user, gpr_atm amount);
+
/*******************************************************************************
* list management
*/
@@ -289,6 +294,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
while ((resource_user = rulist_pop_head(resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION))) {
gpr_mu_lock(&resource_user->mu);
+ if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR
+ " free_pool=%" PRId64,
+ resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
+ resource_user->free_pool);
+ }
+ if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
+ resource_user->allocating = false;
+ grpc_closure_list_fail_all(
+ &resource_user->on_allocated,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
+ int64_t aborted_allocations = resource_user->outstanding_allocations;
+ resource_user->outstanding_allocations = 0;
+ resource_user->free_pool += aborted_allocations;
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
+ gpr_mu_unlock(&resource_user->mu);
+ ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations);
+ continue;
+ }
if (resource_user->free_pool < 0 &&
-resource_user->free_pool <= resource_quota->free_pool) {
int64_t amt = -resource_user->free_pool;
@@ -308,6 +332,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
+ resource_user->outstanding_allocations = 0;
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
@@ -488,6 +513,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
}
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
+ if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
+ gpr_log(GPR_DEBUG, "RU shutdown %p", ru);
+ }
grpc_resource_user *resource_user = (grpc_resource_user *)ru;
GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
@@ -497,6 +525,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
resource_user->reclaimers[1] = NULL;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
+ if (resource_user->allocating) {
+ rq_step_sched(exec_ctx, resource_user->resource_quota);
+ }
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
@@ -718,6 +749,7 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->reclaimers[1] = NULL;
resource_user->new_reclaimers[0] = NULL;
resource_user->new_reclaimers[1] = NULL;
+ resource_user->outstanding_allocations = 0;
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL;
}
@@ -778,6 +810,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&resource_user->mu);
ru_ref_by(resource_user, (gpr_atm)size);
resource_user->free_pool -= (int64_t)size;
+ resource_user->outstanding_allocations += (int64_t)size;
if (GRPC_TRACER_ON(grpc_resource_quota_trace)) {
gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
resource_user->resource_quota->name, resource_user->name, size,
@@ -792,6 +825,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
}
} else {
+ resource_user->outstanding_allocations -= (int64_t)size;
GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);