aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-10-17 14:52:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-10-17 14:52:14 -0700
commit20afa3d7c933207c548ed11928c47b552b5b2f80 (patch)
treeea93d7458fb454dde9aca2083bf1fa89e69c189a /src/core/lib/iomgr
parent654b242ce70afcf9fdab674cab9b71d8d3f02502 (diff)
BufferPool --> ResourceQuota
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/buffer_pool.c684
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h6
-rw-r--r--src/core/lib/iomgr/endpoint_pair.h2
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.c6
-rw-r--r--src/core/lib/iomgr/resource_quota.c684
-rw-r--r--src/core/lib/iomgr/resource_quota.h (renamed from src/core/lib/iomgr/buffer_pool.h)83
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_posix.c42
-rw-r--r--src/core/lib/iomgr/tcp_posix.h2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c18
11 files changed, 773 insertions, 768 deletions
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
deleted file mode 100644
index 8fbf75cbe4..0000000000
--- a/src/core/lib/iomgr/buffer_pool.c
+++ /dev/null
@@ -1,684 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/iomgr/buffer_pool.h"
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/combiner.h"
-
-int grpc_buffer_pool_trace = 0;
-
-typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx,
- grpc_buffer_pool *buffer_pool);
-
-typedef struct {
- grpc_buffer_user *head;
- grpc_buffer_user *tail;
-} grpc_buffer_user_list;
-
-struct grpc_buffer_pool {
- gpr_refcount refs;
-
- grpc_combiner *combiner;
- int64_t size;
- int64_t free_pool;
-
- bool step_scheduled;
- bool reclaiming;
- grpc_closure bpstep_closure;
- grpc_closure bpreclaimation_done_closure;
-
- grpc_buffer_user *roots[GRPC_BULIST_COUNT];
-
- char *name;
-};
-
-/*******************************************************************************
- * list management
- */
-
-static void bulist_add_tail(grpc_buffer_user *buffer_user, grpc_bulist list) {
- grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
- grpc_buffer_user **root = &buffer_pool->roots[list];
- if (*root == NULL) {
- *root = buffer_user;
- buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user;
- } else {
- buffer_user->links[list].next = *root;
- buffer_user->links[list].prev = (*root)->links[list].prev;
- buffer_user->links[list].next->links[list].prev =
- buffer_user->links[list].prev->links[list].next = buffer_user;
- }
-}
-
-static void bulist_add_head(grpc_buffer_user *buffer_user, grpc_bulist list) {
- grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
- grpc_buffer_user **root = &buffer_pool->roots[list];
- if (*root == NULL) {
- *root = buffer_user;
- buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user;
- } else {
- buffer_user->links[list].next = (*root)->links[list].next;
- buffer_user->links[list].prev = *root;
- buffer_user->links[list].next->links[list].prev =
- buffer_user->links[list].prev->links[list].next = buffer_user;
- *root = buffer_user;
- }
-}
-
-static bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list) {
- return buffer_pool->roots[list] == NULL;
-}
-
-static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool,
- grpc_bulist list) {
- grpc_buffer_user **root = &buffer_pool->roots[list];
- grpc_buffer_user *buffer_user = *root;
- if (buffer_user == NULL) {
- return NULL;
- }
- if (buffer_user->links[list].next == buffer_user) {
- *root = NULL;
- } else {
- buffer_user->links[list].next->links[list].prev =
- buffer_user->links[list].prev;
- buffer_user->links[list].prev->links[list].next =
- buffer_user->links[list].next;
- *root = buffer_user->links[list].next;
- }
- buffer_user->links[list].next = buffer_user->links[list].prev = NULL;
- return buffer_user;
-}
-
-static void bulist_remove(grpc_buffer_user *buffer_user, grpc_bulist list) {
- if (buffer_user->links[list].next == NULL) return;
- grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
- if (buffer_pool->roots[list] == buffer_user) {
- buffer_pool->roots[list] = buffer_user->links[list].next;
- if (buffer_pool->roots[list] == buffer_user) {
- buffer_pool->roots[list] = NULL;
- }
- }
- buffer_user->links[list].next->links[list].prev =
- buffer_user->links[list].prev;
- buffer_user->links[list].prev->links[list].next =
- buffer_user->links[list].next;
-}
-
-/*******************************************************************************
- * buffer pool state machine
- */
-
-static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool);
-static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool);
-static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
- bool destructive);
-
-static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
- grpc_buffer_pool *buffer_pool = bp;
- buffer_pool->step_scheduled = false;
- do {
- if (bpalloc(exec_ctx, buffer_pool)) goto done;
- } while (bpscavenge(exec_ctx, buffer_pool));
- bpreclaim(exec_ctx, buffer_pool, false) ||
- bpreclaim(exec_ctx, buffer_pool, true);
-done:
- grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
-}
-
-static void bpstep_sched(grpc_exec_ctx *exec_ctx,
- grpc_buffer_pool *buffer_pool) {
- if (buffer_pool->step_scheduled) return;
- buffer_pool->step_scheduled = true;
- grpc_buffer_pool_internal_ref(buffer_pool);
- grpc_combiner_execute_finally(exec_ctx, buffer_pool->combiner,
- &buffer_pool->bpstep_closure, GRPC_ERROR_NONE,
- false);
-}
-
-/* returns true if all allocations are completed */
-static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
- grpc_buffer_user *buffer_user;
- while ((buffer_user =
- bulist_pop(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) {
- gpr_mu_lock(&buffer_user->mu);
- if (buffer_user->free_pool < 0 &&
- -buffer_user->free_pool <= buffer_pool->free_pool) {
- int64_t amt = -buffer_user->free_pool;
- buffer_user->free_pool = 0;
- buffer_pool->free_pool -= amt;
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
- " bytes; bp_free_pool -> %" PRId64,
- buffer_pool->name, buffer_user->name, amt,
- buffer_pool->free_pool);
- }
- } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) {
- gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
- buffer_pool->name, buffer_user->name);
- }
- if (buffer_user->free_pool >= 0) {
- buffer_user->allocating = false;
- grpc_exec_ctx_enqueue_list(exec_ctx, &buffer_user->on_allocated, NULL);
- gpr_mu_unlock(&buffer_user->mu);
- } else {
- bulist_add_head(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION);
- gpr_mu_unlock(&buffer_user->mu);
- return false;
- }
- }
- return true;
-}
-
-/* returns true if any memory could be reclaimed from buffers */
-static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
- grpc_buffer_user *buffer_user;
- while ((buffer_user =
- bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
- gpr_mu_lock(&buffer_user->mu);
- if (buffer_user->free_pool > 0) {
- int64_t amt = buffer_user->free_pool;
- buffer_user->free_pool = 0;
- buffer_pool->free_pool += amt;
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
- " bytes; bp_free_pool -> %" PRId64,
- buffer_pool->name, buffer_user->name, amt,
- buffer_pool->free_pool);
- }
- gpr_mu_unlock(&buffer_user->mu);
- return true;
- } else {
- gpr_mu_unlock(&buffer_user->mu);
- }
- }
- return false;
-}
-
-/* returns true if reclaimation is proceeding */
-static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
- bool destructive) {
- if (buffer_pool->reclaiming) return true;
- grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE
- : GRPC_BULIST_RECLAIMER_BENIGN;
- grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list);
- if (buffer_user == NULL) return false;
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name,
- buffer_user->name, destructive ? "destructive" : "benign");
- }
- buffer_pool->reclaiming = true;
- grpc_buffer_pool_internal_ref(buffer_pool);
- grpc_closure *c = buffer_user->reclaimers[destructive];
- buffer_user->reclaimers[destructive] = NULL;
- grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
- return true;
-}
-
-/*******************************************************************************
- * bu_slice: a slice implementation that is backed by a grpc_buffer_user
- */
-
-typedef struct {
- gpr_slice_refcount base;
- gpr_refcount refs;
- grpc_buffer_user *buffer_user;
- size_t size;
-} bu_slice_refcount;
-
-static void bu_slice_ref(void *p) {
- bu_slice_refcount *rc = p;
- gpr_ref(&rc->refs);
-}
-
-static void bu_slice_unref(void *p) {
- bu_slice_refcount *rc = p;
- if (gpr_unref(&rc->refs)) {
- /* TODO(ctiller): this is dangerous, but I think safe for now:
- we have no guarantee here that we're at a safe point for creating an
- execution context, but we have no way of writing this code otherwise.
- In the future: consider lifting gpr_slice to grpc, and offering an
- internal_{ref,unref} pair that is execution context aware. Alternatively,
- make exec_ctx be thread local and 'do the right thing' (whatever that is)
- if NULL */
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size);
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_free(rc);
- }
-}
-
-static gpr_slice bu_slice_create(grpc_buffer_user *buffer_user, size_t size) {
- bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size);
- rc->base.ref = bu_slice_ref;
- rc->base.unref = bu_slice_unref;
- gpr_ref_init(&rc->refs, 1);
- rc->buffer_user = buffer_user;
- rc->size = size;
- gpr_slice slice;
- slice.refcount = &rc->base;
- slice.data.refcounted.bytes = (uint8_t *)(rc + 1);
- slice.data.refcounted.length = size;
- return slice;
-}
-
-/*******************************************************************************
- * grpc_buffer_pool internal implementation
- */
-
-static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
- grpc_buffer_user *buffer_user = bu;
- if (bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) {
- bpstep_sched(exec_ctx, buffer_user->buffer_pool);
- }
- bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION);
-}
-
-static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
- grpc_error *error) {
- grpc_buffer_user *buffer_user = bu;
- if (!bulist_empty(buffer_user->buffer_pool,
- GRPC_BULIST_AWAITING_ALLOCATION) &&
- bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL)) {
- bpstep_sched(exec_ctx, buffer_user->buffer_pool);
- }
- bulist_add_tail(buffer_user, GRPC_BULIST_NON_EMPTY_FREE_POOL);
-}
-
-static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
- grpc_error *error) {
- grpc_buffer_user *buffer_user = bu;
- if (!bulist_empty(buffer_user->buffer_pool,
- GRPC_BULIST_AWAITING_ALLOCATION) &&
- bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
- bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN)) {
- bpstep_sched(exec_ctx, buffer_user->buffer_pool);
- }
- bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN);
-}
-
-static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
- grpc_error *error) {
- grpc_buffer_user *buffer_user = bu;
- if (!bulist_empty(buffer_user->buffer_pool,
- GRPC_BULIST_AWAITING_ALLOCATION) &&
- bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
- bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN) &&
- bulist_empty(buffer_user->buffer_pool,
- GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) {
- bpstep_sched(exec_ctx, buffer_user->buffer_pool);
- }
- bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE);
-}
-
-static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
- grpc_buffer_user *buffer_user = bu;
- GPR_ASSERT(buffer_user->allocated == 0);
- for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
- bulist_remove(buffer_user, (grpc_bulist)i);
- }
- grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load(
- &buffer_user->on_done_destroy_closure),
- GRPC_ERROR_NONE, NULL);
- if (buffer_user->free_pool != 0) {
- buffer_user->buffer_pool->free_pool += buffer_user->free_pool;
- bpstep_sched(exec_ctx, buffer_user->buffer_pool);
- }
-}
-
-static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
- grpc_error *error) {
- grpc_buffer_user_slice_allocator *slice_allocator = ts;
- if (error == GRPC_ERROR_NONE) {
- for (size_t i = 0; i < slice_allocator->count; i++) {
- gpr_slice_buffer_add_indexed(slice_allocator->dest,
- bu_slice_create(slice_allocator->buffer_user,
- slice_allocator->length));
- }
- }
- grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
-}
-
-typedef struct {
- int64_t size;
- grpc_buffer_pool *buffer_pool;
- grpc_closure closure;
-} bp_resize_args;
-
-static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
- bp_resize_args *a = args;
- int64_t delta = a->size - a->buffer_pool->size;
- a->buffer_pool->size += delta;
- a->buffer_pool->free_pool += delta;
- if (delta < 0 && a->buffer_pool->free_pool < 0) {
- bpstep_sched(exec_ctx, a->buffer_pool);
- } else if (delta > 0 &&
- !bulist_empty(a->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) {
- bpstep_sched(exec_ctx, a->buffer_pool);
- }
- grpc_buffer_pool_internal_unref(exec_ctx, a->buffer_pool);
- gpr_free(a);
-}
-
-static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
- grpc_error *error) {
- grpc_buffer_pool *buffer_pool = bp;
- buffer_pool->reclaiming = false;
- bpstep_sched(exec_ctx, buffer_pool);
- grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
-}
-
-/*******************************************************************************
- * grpc_buffer_pool api
- */
-
-grpc_buffer_pool *grpc_buffer_pool_create(const char *name) {
- grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool));
- gpr_ref_init(&buffer_pool->refs, 1);
- buffer_pool->combiner = grpc_combiner_create(NULL);
- buffer_pool->free_pool = INT64_MAX;
- buffer_pool->size = INT64_MAX;
- buffer_pool->step_scheduled = false;
- buffer_pool->reclaiming = false;
- if (name != NULL) {
- buffer_pool->name = gpr_strdup(name);
- } else {
- gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR,
- (intptr_t)buffer_pool);
- }
- grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool);
- grpc_closure_init(&buffer_pool->bpreclaimation_done_closure,
- bp_reclaimation_done, buffer_pool);
- for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
- buffer_pool->roots[i] = NULL;
- }
- return buffer_pool;
-}
-
-void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_buffer_pool *buffer_pool) {
- if (gpr_unref(&buffer_pool->refs)) {
- grpc_combiner_destroy(exec_ctx, buffer_pool->combiner);
- gpr_free(buffer_pool->name);
- gpr_free(buffer_pool);
- }
-}
-
-void grpc_buffer_pool_unref(grpc_buffer_pool *buffer_pool) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool) {
- gpr_ref(&buffer_pool->refs);
- return buffer_pool;
-}
-
-void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool) {
- grpc_buffer_pool_internal_ref(buffer_pool);
-}
-
-void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- bp_resize_args *a = gpr_malloc(sizeof(*a));
- a->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
- a->size = (int64_t)size;
- grpc_closure_init(&a->closure, bp_resize, a);
- grpc_combiner_execute(&exec_ctx, buffer_pool->combiner, &a->closure,
- GRPC_ERROR_NONE, false);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-/*******************************************************************************
- * grpc_buffer_user channel args api
- */
-
-grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
- const grpc_channel_args *channel_args) {
- for (size_t i = 0; i < channel_args->num_args; i++) {
- if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
- if (channel_args->args[i].type == GRPC_ARG_POINTER) {
- return grpc_buffer_pool_internal_ref(
- channel_args->args[i].value.pointer.p);
- } else {
- gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer");
- }
- }
- }
- return grpc_buffer_pool_create(NULL);
-}
-
-static void *bp_copy(void *bp) {
- grpc_buffer_pool_ref(bp);
- return bp;
-}
-
-static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); }
-
-static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
-
-const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) {
- static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp};
- return &vtable;
-}
-
-/*******************************************************************************
- * grpc_buffer_user api
- */
-
-void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
- grpc_buffer_pool *buffer_pool, const char *name) {
- buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
- grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user);
- grpc_closure_init(&buffer_user->add_to_free_pool_closure,
- &bu_add_to_free_pool, buffer_user);
- grpc_closure_init(&buffer_user->post_reclaimer_closure[0],
- &bu_post_benign_reclaimer, buffer_user);
- grpc_closure_init(&buffer_user->post_reclaimer_closure[1],
- &bu_post_destructive_reclaimer, buffer_user);
- grpc_closure_init(&buffer_user->destroy_closure, &bu_destroy, buffer_user);
- gpr_mu_init(&buffer_user->mu);
- buffer_user->allocated = 0;
- buffer_user->free_pool = 0;
- grpc_closure_list_init(&buffer_user->on_allocated);
- buffer_user->allocating = false;
- buffer_user->added_to_free_pool = false;
- gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, 0);
- buffer_user->reclaimers[0] = NULL;
- buffer_user->reclaimers[1] = NULL;
- for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
- buffer_user->links[i].next = buffer_user->links[i].prev = NULL;
- }
-#ifndef NDEBUG
- buffer_user->asan_canary = gpr_malloc(1);
-#endif
- if (name != NULL) {
- buffer_user->name = gpr_strdup(name);
- } else {
- gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR,
- (intptr_t)buffer_user);
- }
-}
-
-void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user,
- grpc_closure *on_done) {
- gpr_mu_lock(&buffer_user->mu);
- GPR_ASSERT(gpr_atm_no_barrier_load(&buffer_user->on_done_destroy_closure) ==
- 0);
- gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure,
- (gpr_atm)on_done);
- if (buffer_user->allocated == 0) {
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->destroy_closure, GRPC_ERROR_NONE,
- false);
- }
- gpr_mu_unlock(&buffer_user->mu);
-}
-
-void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user) {
-#ifndef NDEBUG
- gpr_free(buffer_user->asan_canary);
-#endif
- grpc_buffer_pool_internal_unref(exec_ctx, buffer_user->buffer_pool);
- gpr_mu_destroy(&buffer_user->mu);
- gpr_free(buffer_user->name);
-}
-
-void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user, size_t size,
- grpc_closure *optional_on_done) {
- gpr_mu_lock(&buffer_user->mu);
- grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
- &buffer_user->on_done_destroy_closure);
- if (on_done_destroy != NULL) {
- /* already shutdown */
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
- buffer_user->buffer_pool->name, buffer_user->name, size);
- }
- grpc_exec_ctx_sched(
- exec_ctx, optional_on_done,
- GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
- gpr_mu_unlock(&buffer_user->mu);
- return;
- }
- buffer_user->allocated += (int64_t)size;
- buffer_user->free_pool -= (int64_t)size;
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
- ", free_pool -> %" PRId64,
- buffer_user->buffer_pool->name, buffer_user->name, size,
- buffer_user->allocated, buffer_user->free_pool);
- }
- if (buffer_user->free_pool < 0) {
- grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done,
- GRPC_ERROR_NONE);
- if (!buffer_user->allocating) {
- buffer_user->allocating = true;
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->allocate_closure, GRPC_ERROR_NONE,
- false);
- }
- } else {
- grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
- }
- gpr_mu_unlock(&buffer_user->mu);
-}
-
-void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user, size_t size) {
- gpr_mu_lock(&buffer_user->mu);
- GPR_ASSERT(buffer_user->allocated >= (int64_t)size);
- bool was_zero_or_negative = buffer_user->free_pool <= 0;
- buffer_user->free_pool += (int64_t)size;
- buffer_user->allocated -= (int64_t)size;
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
- ", free_pool -> %" PRId64,
- buffer_user->buffer_pool->name, buffer_user->name, size,
- buffer_user->allocated, buffer_user->free_pool);
- }
- bool is_bigger_than_zero = buffer_user->free_pool > 0;
- if (is_bigger_than_zero && was_zero_or_negative &&
- !buffer_user->added_to_free_pool) {
- buffer_user->added_to_free_pool = true;
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->add_to_free_pool_closure,
- GRPC_ERROR_NONE, false);
- }
- grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
- &buffer_user->on_done_destroy_closure);
- if (on_done_destroy != NULL && buffer_user->allocated == 0) {
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->destroy_closure, GRPC_ERROR_NONE,
- false);
- }
- gpr_mu_unlock(&buffer_user->mu);
-}
-
-void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user,
- bool destructive, grpc_closure *closure) {
- if (gpr_atm_acq_load(&buffer_user->on_done_destroy_closure) == 0) {
- GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL);
- buffer_user->reclaimers[destructive] = closure;
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->post_reclaimer_closure[destructive],
- GRPC_ERROR_NONE, false);
- } else {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
- }
-}
-
-void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user) {
- if (grpc_buffer_pool_trace) {
- gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
- buffer_user->buffer_pool->name, buffer_user->name);
- }
- grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
- &buffer_user->buffer_pool->bpreclaimation_done_closure,
- GRPC_ERROR_NONE, false);
-}
-
-void grpc_buffer_user_slice_allocator_init(
- grpc_buffer_user_slice_allocator *slice_allocator,
- grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices,
- slice_allocator);
- grpc_closure_init(&slice_allocator->on_done, cb, p);
- slice_allocator->buffer_user = buffer_user;
-}
-
-void grpc_buffer_user_alloc_slices(
- grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
- size_t length, size_t count, gpr_slice_buffer *dest) {
- slice_allocator->length = length;
- slice_allocator->count = count;
- slice_allocator->dest = dest;
- grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length,
- &slice_allocator->on_allocated);
-}
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index f3548a1d74..74fa9c45df 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -70,6 +70,6 @@ grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}
-grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) {
- return ep->vtable->get_buffer_user(ep);
+grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) {
+ return ep->vtable->get_resource_user(ep);
}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index df6b899e39..819fcdda1a 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -37,7 +37,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
-#include "src/core/lib/iomgr/buffer_pool.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@@ -59,7 +59,7 @@ struct grpc_endpoint_vtable {
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
- grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep);
+ grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
};
@@ -102,7 +102,7 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
-grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint);
+grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h
index 4938cf8599..27c17a0e97 100644
--- a/src/core/lib/iomgr/endpoint_pair.h
+++ b/src/core/lib/iomgr/endpoint_pair.h
@@ -42,6 +42,6 @@ typedef struct {
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size);
+ const char *name, grpc_resource_quota *resource_quota, size_t read_slice_size);
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c
index 64c161675f..c1437bcf17 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.c
+++ b/src/core/lib/iomgr/endpoint_pair_posix.c
@@ -63,18 +63,18 @@ static void create_sockets(int sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
- const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) {
+ const char *name, grpc_resource_quota *resource_quota, size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota,
read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota,
read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
new file mode 100644
index 0000000000..c4e6e5482a
--- /dev/null
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -0,0 +1,684 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/resource_quota.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/combiner.h"
+
+int grpc_resource_quota_trace = 0;
+
+typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx,
+ grpc_resource_quota *resource_quota);
+
+typedef struct {
+ grpc_resource_user *head;
+ grpc_resource_user *tail;
+} grpc_resource_user_list;
+
+struct grpc_resource_quota {
+ gpr_refcount refs;
+
+ grpc_combiner *combiner;
+ int64_t size;
+ int64_t free_pool;
+
+ bool step_scheduled;
+ bool reclaiming;
+ grpc_closure bpstep_closure;
+ grpc_closure bpreclaimation_done_closure;
+
+ grpc_resource_user *roots[GRPC_BULIST_COUNT];
+
+ char *name;
+};
+
+/*******************************************************************************
+ * list management
+ */
+
+static void bulist_add_tail(grpc_resource_user *resource_user, grpc_bulist list) {
+ grpc_resource_quota *resource_quota = resource_user->resource_quota;
+ grpc_resource_user **root = &resource_quota->roots[list];
+ if (*root == NULL) {
+ *root = resource_user;
+ resource_user->links[list].next = resource_user->links[list].prev = resource_user;
+ } else {
+ resource_user->links[list].next = *root;
+ resource_user->links[list].prev = (*root)->links[list].prev;
+ resource_user->links[list].next->links[list].prev =
+ resource_user->links[list].prev->links[list].next = resource_user;
+ }
+}
+
+static void bulist_add_head(grpc_resource_user *resource_user, grpc_bulist list) {
+ grpc_resource_quota *resource_quota = resource_user->resource_quota;
+ grpc_resource_user **root = &resource_quota->roots[list];
+ if (*root == NULL) {
+ *root = resource_user;
+ resource_user->links[list].next = resource_user->links[list].prev = resource_user;
+ } else {
+ resource_user->links[list].next = (*root)->links[list].next;
+ resource_user->links[list].prev = *root;
+ resource_user->links[list].next->links[list].prev =
+ resource_user->links[list].prev->links[list].next = resource_user;
+ *root = resource_user;
+ }
+}
+
+static bool bulist_empty(grpc_resource_quota *resource_quota, grpc_bulist list) {
+ return resource_quota->roots[list] == NULL;
+}
+
+static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota,
+ grpc_bulist list) {
+ grpc_resource_user **root = &resource_quota->roots[list];
+ grpc_resource_user *resource_user = *root;
+ if (resource_user == NULL) {
+ return NULL;
+ }
+ if (resource_user->links[list].next == resource_user) {
+ *root = NULL;
+ } else {
+ resource_user->links[list].next->links[list].prev =
+ resource_user->links[list].prev;
+ resource_user->links[list].prev->links[list].next =
+ resource_user->links[list].next;
+ *root = resource_user->links[list].next;
+ }
+ resource_user->links[list].next = resource_user->links[list].prev = NULL;
+ return resource_user;
+}
+
+static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) {
+ if (resource_user->links[list].next == NULL) return;
+ grpc_resource_quota *resource_quota = resource_user->resource_quota;
+ if (resource_quota->roots[list] == resource_user) {
+ resource_quota->roots[list] = resource_user->links[list].next;
+ if (resource_quota->roots[list] == resource_user) {
+ resource_quota->roots[list] = NULL;
+ }
+ }
+ resource_user->links[list].next->links[list].prev =
+ resource_user->links[list].prev;
+ resource_user->links[list].prev->links[list].next =
+ resource_user->links[list].next;
+}
+
+/*******************************************************************************
+ * buffer pool state machine
+ */
+
+static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota);
+static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota);
+static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota,
+ bool destructive);
+
+static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
+ grpc_resource_quota *resource_quota = bp;
+ resource_quota->step_scheduled = false;
+ do {
+ if (bpalloc(exec_ctx, resource_quota)) goto done;
+ } while (bpscavenge(exec_ctx, resource_quota));
+ bpreclaim(exec_ctx, resource_quota, false) ||
+ bpreclaim(exec_ctx, resource_quota, true);
+done:
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+}
+
+static void bpstep_sched(grpc_exec_ctx *exec_ctx,
+ grpc_resource_quota *resource_quota) {
+ if (resource_quota->step_scheduled) return;
+ resource_quota->step_scheduled = true;
+ grpc_resource_quota_internal_ref(resource_quota);
+ grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
+ &resource_quota->bpstep_closure, GRPC_ERROR_NONE,
+ false);
+}
+
+/* returns true if all allocations are completed */
+static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) {
+ grpc_resource_user *resource_user;
+ while ((resource_user =
+ bulist_pop(resource_quota, GRPC_BULIST_AWAITING_ALLOCATION))) {
+ gpr_mu_lock(&resource_user->mu);
+ if (resource_user->free_pool < 0 &&
+ -resource_user->free_pool <= resource_quota->free_pool) {
+ int64_t amt = -resource_user->free_pool;
+ resource_user->free_pool = 0;
+ resource_quota->free_pool -= amt;
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ resource_quota->name, resource_user->name, amt,
+ resource_quota->free_pool);
+ }
+ } else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) {
+ gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
+ resource_quota->name, resource_user->name);
+ }
+ if (resource_user->free_pool >= 0) {
+ resource_user->allocating = false;
+ grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
+ gpr_mu_unlock(&resource_user->mu);
+ } else {
+ bulist_add_head(resource_user, GRPC_BULIST_AWAITING_ALLOCATION);
+ gpr_mu_unlock(&resource_user->mu);
+ return false;
+ }
+ }
+ return true;
+}
+
+/* returns true if any memory could be reclaimed from buffers */
+static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) {
+ grpc_resource_user *resource_user;
+ while ((resource_user =
+ bulist_pop(resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
+ gpr_mu_lock(&resource_user->mu);
+ if (resource_user->free_pool > 0) {
+ int64_t amt = resource_user->free_pool;
+ resource_user->free_pool = 0;
+ resource_quota->free_pool += amt;
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ resource_quota->name, resource_user->name, amt,
+ resource_quota->free_pool);
+ }
+ gpr_mu_unlock(&resource_user->mu);
+ return true;
+ } else {
+ gpr_mu_unlock(&resource_user->mu);
+ }
+ }
+ return false;
+}
+
+/* returns true if reclaimation is proceeding */
+static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota,
+ bool destructive) {
+ if (resource_quota->reclaiming) return true;
+ grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE
+ : GRPC_BULIST_RECLAIMER_BENIGN;
+ grpc_resource_user *resource_user = bulist_pop(resource_quota, list);
+ if (resource_user == NULL) return false;
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", resource_quota->name,
+ resource_user->name, destructive ? "destructive" : "benign");
+ }
+ resource_quota->reclaiming = true;
+ grpc_resource_quota_internal_ref(resource_quota);
+ grpc_closure *c = resource_user->reclaimers[destructive];
+ resource_user->reclaimers[destructive] = NULL;
+ grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
+ return true;
+}
+
+/*******************************************************************************
+ * bu_slice: a slice implementation that is backed by a grpc_resource_user
+ */
+
+typedef struct {
+ gpr_slice_refcount base;
+ gpr_refcount refs;
+ grpc_resource_user *resource_user;
+ size_t size;
+} bu_slice_refcount;
+
+static void bu_slice_ref(void *p) {
+ bu_slice_refcount *rc = p;
+ gpr_ref(&rc->refs);
+}
+
+static void bu_slice_unref(void *p) {
+ bu_slice_refcount *rc = p;
+ if (gpr_unref(&rc->refs)) {
+ /* TODO(ctiller): this is dangerous, but I think safe for now:
+ we have no guarantee here that we're at a safe point for creating an
+ execution context, but we have no way of writing this code otherwise.
+ In the future: consider lifting gpr_slice to grpc, and offering an
+ internal_{ref,unref} pair that is execution context aware. Alternatively,
+ make exec_ctx be thread local and 'do the right thing' (whatever that is)
+ if NULL */
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(rc);
+ }
+}
+
+static gpr_slice bu_slice_create(grpc_resource_user *resource_user, size_t size) {
+ bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size);
+ rc->base.ref = bu_slice_ref;
+ rc->base.unref = bu_slice_unref;
+ gpr_ref_init(&rc->refs, 1);
+ rc->resource_user = resource_user;
+ rc->size = size;
+ gpr_slice slice;
+ slice.refcount = &rc->base;
+ slice.data.refcounted.bytes = (uint8_t *)(rc + 1);
+ slice.data.refcounted.length = size;
+ return slice;
+}
+
+/*******************************************************************************
+ * grpc_resource_quota internal implementation
+ */
+
+static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
+ grpc_resource_user *resource_user = bu;
+ if (bulist_empty(resource_user->resource_quota, GRPC_BULIST_AWAITING_ALLOCATION)) {
+ bpstep_sched(exec_ctx, resource_user->resource_quota);
+ }
+ bulist_add_tail(resource_user, GRPC_BULIST_AWAITING_ALLOCATION);
+}
+
+static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_resource_user *resource_user = bu;
+ if (!bulist_empty(resource_user->resource_quota,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL)) {
+ bpstep_sched(exec_ctx, resource_user->resource_quota);
+ }
+ bulist_add_tail(resource_user, GRPC_BULIST_NON_EMPTY_FREE_POOL);
+}
+
+static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_resource_user *resource_user = bu;
+ if (!bulist_empty(resource_user->resource_quota,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
+ bulist_empty(resource_user->resource_quota, GRPC_BULIST_RECLAIMER_BENIGN)) {
+ bpstep_sched(exec_ctx, resource_user->resource_quota);
+ }
+ bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_BENIGN);
+}
+
+static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_resource_user *resource_user = bu;
+ if (!bulist_empty(resource_user->resource_quota,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
+ bulist_empty(resource_user->resource_quota, GRPC_BULIST_RECLAIMER_BENIGN) &&
+ bulist_empty(resource_user->resource_quota,
+ GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) {
+ bpstep_sched(exec_ctx, resource_user->resource_quota);
+ }
+ bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE);
+}
+
+static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
+ grpc_resource_user *resource_user = bu;
+ GPR_ASSERT(resource_user->allocated == 0);
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ bulist_remove(resource_user, (grpc_bulist)i);
+ }
+ grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load(
+ &resource_user->on_done_destroy_closure),
+ GRPC_ERROR_NONE, NULL);
+ if (resource_user->free_pool != 0) {
+ resource_user->resource_quota->free_pool += resource_user->free_pool;
+ bpstep_sched(exec_ctx, resource_user->resource_quota);
+ }
+}
+
+static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
+ grpc_error *error) {
+ grpc_resource_user_slice_allocator *slice_allocator = ts;
+ if (error == GRPC_ERROR_NONE) {
+ for (size_t i = 0; i < slice_allocator->count; i++) {
+ gpr_slice_buffer_add_indexed(slice_allocator->dest,
+ bu_slice_create(slice_allocator->resource_user,
+ slice_allocator->length));
+ }
+ }
+ grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
+}
+
+typedef struct {
+ int64_t size;
+ grpc_resource_quota *resource_quota;
+ grpc_closure closure;
+} bp_resize_args;
+
+static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
+ bp_resize_args *a = args;
+ int64_t delta = a->size - a->resource_quota->size;
+ a->resource_quota->size += delta;
+ a->resource_quota->free_pool += delta;
+ if (delta < 0 && a->resource_quota->free_pool < 0) {
+ bpstep_sched(exec_ctx, a->resource_quota);
+ } else if (delta > 0 &&
+ !bulist_empty(a->resource_quota, GRPC_BULIST_AWAITING_ALLOCATION)) {
+ bpstep_sched(exec_ctx, a->resource_quota);
+ }
+ grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
+ gpr_free(a);
+}
+
+static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error) {
+ grpc_resource_quota *resource_quota = bp;
+ resource_quota->reclaiming = false;
+ bpstep_sched(exec_ctx, resource_quota);
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+}
+
+/*******************************************************************************
+ * grpc_resource_quota api
+ */
+
+grpc_resource_quota *grpc_resource_quota_create(const char *name) {
+ grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota));
+ gpr_ref_init(&resource_quota->refs, 1);
+ resource_quota->combiner = grpc_combiner_create(NULL);
+ resource_quota->free_pool = INT64_MAX;
+ resource_quota->size = INT64_MAX;
+ resource_quota->step_scheduled = false;
+ resource_quota->reclaiming = false;
+ if (name != NULL) {
+ resource_quota->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
+ (intptr_t)resource_quota);
+ }
+ grpc_closure_init(&resource_quota->bpstep_closure, bpstep, resource_quota);
+ grpc_closure_init(&resource_quota->bpreclaimation_done_closure,
+ bp_reclaimation_done, resource_quota);
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ resource_quota->roots[i] = NULL;
+ }
+ return resource_quota;
+}
+
+void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_resource_quota *resource_quota) {
+ if (gpr_unref(&resource_quota->refs)) {
+ grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
+ gpr_free(resource_quota->name);
+ gpr_free(resource_quota);
+ }
+}
+
+void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+grpc_resource_quota *grpc_resource_quota_internal_ref(grpc_resource_quota *resource_quota) {
+ gpr_ref(&resource_quota->refs);
+ return resource_quota;
+}
+
+void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
+ grpc_resource_quota_internal_ref(resource_quota);
+}
+
+void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ bp_resize_args *a = gpr_malloc(sizeof(*a));
+ a->resource_quota = grpc_resource_quota_internal_ref(resource_quota);
+ a->size = (int64_t)size;
+ grpc_closure_init(&a->closure, bp_resize, a);
+ grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
+ GRPC_ERROR_NONE, false);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+/*******************************************************************************
+ * grpc_resource_user channel args api
+ */
+
+grpc_resource_quota *grpc_resource_quota_from_channel_args(
+ const grpc_channel_args *channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
+ if (channel_args->args[i].type == GRPC_ARG_POINTER) {
+ return grpc_resource_quota_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ } else {
+ gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer");
+ }
+ }
+ }
+ return grpc_resource_quota_create(NULL);
+}
+
+static void *bp_copy(void *bp) {
+ grpc_resource_quota_ref(bp);
+ return bp;
+}
+
+static void bp_destroy(void *bp) { grpc_resource_quota_unref(bp); }
+
+static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+
+const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) {
+ static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp};
+ return &vtable;
+}
+
+/*******************************************************************************
+ * grpc_resource_user api
+ */
+
+void grpc_resource_user_init(grpc_resource_user *resource_user,
+ grpc_resource_quota *resource_quota, const char *name) {
+ resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota);
+ grpc_closure_init(&resource_user->allocate_closure, &bu_allocate, resource_user);
+ grpc_closure_init(&resource_user->add_to_free_pool_closure,
+ &bu_add_to_free_pool, resource_user);
+ grpc_closure_init(&resource_user->post_reclaimer_closure[0],
+ &bu_post_benign_reclaimer, resource_user);
+ grpc_closure_init(&resource_user->post_reclaimer_closure[1],
+ &bu_post_destructive_reclaimer, resource_user);
+ grpc_closure_init(&resource_user->destroy_closure, &bu_destroy, resource_user);
+ gpr_mu_init(&resource_user->mu);
+ resource_user->allocated = 0;
+ resource_user->free_pool = 0;
+ grpc_closure_list_init(&resource_user->on_allocated);
+ resource_user->allocating = false;
+ resource_user->added_to_free_pool = false;
+ gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0);
+ resource_user->reclaimers[0] = NULL;
+ resource_user->reclaimers[1] = NULL;
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ resource_user->links[i].next = resource_user->links[i].prev = NULL;
+ }
+#ifndef NDEBUG
+ resource_user->asan_canary = gpr_malloc(1);
+#endif
+ if (name != NULL) {
+ resource_user->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
+ (intptr_t)resource_user);
+ }
+}
+
+void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ grpc_closure *on_done) {
+ gpr_mu_lock(&resource_user->mu);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) ==
+ 0);
+ gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure,
+ (gpr_atm)on_done);
+ if (resource_user->allocated == 0) {
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->destroy_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ gpr_mu_unlock(&resource_user->mu);
+}
+
+void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user) {
+#ifndef NDEBUG
+ gpr_free(resource_user->asan_canary);
+#endif
+ grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
+ gpr_mu_destroy(&resource_user->mu);
+ gpr_free(resource_user->name);
+}
+
+void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user, size_t size,
+ grpc_closure *optional_on_done) {
+ gpr_mu_lock(&resource_user->mu);
+ grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
+ &resource_user->on_done_destroy_closure);
+ if (on_done_destroy != NULL) {
+ /* already shutdown */
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
+ resource_user->resource_quota->name, resource_user->name, size);
+ }
+ grpc_exec_ctx_sched(
+ exec_ctx, optional_on_done,
+ GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
+ gpr_mu_unlock(&resource_user->mu);
+ return;
+ }
+ resource_user->allocated += (int64_t)size;
+ resource_user->free_pool -= (int64_t)size;
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ resource_user->resource_quota->name, resource_user->name, size,
+ resource_user->allocated, resource_user->free_pool);
+ }
+ if (resource_user->free_pool < 0) {
+ grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
+ GRPC_ERROR_NONE);
+ if (!resource_user->allocating) {
+ resource_user->allocating = true;
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->allocate_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
+ }
+ gpr_mu_unlock(&resource_user->mu);
+}
+
+void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user, size_t size) {
+ gpr_mu_lock(&resource_user->mu);
+ GPR_ASSERT(resource_user->allocated >= (int64_t)size);
+ bool was_zero_or_negative = resource_user->free_pool <= 0;
+ resource_user->free_pool += (int64_t)size;
+ resource_user->allocated -= (int64_t)size;
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ resource_user->resource_quota->name, resource_user->name, size,
+ resource_user->allocated, resource_user->free_pool);
+ }
+ bool is_bigger_than_zero = resource_user->free_pool > 0;
+ if (is_bigger_than_zero && was_zero_or_negative &&
+ !resource_user->added_to_free_pool) {
+ resource_user->added_to_free_pool = true;
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->add_to_free_pool_closure,
+ GRPC_ERROR_NONE, false);
+ }
+ grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
+ &resource_user->on_done_destroy_closure);
+ if (on_done_destroy != NULL && resource_user->allocated == 0) {
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->destroy_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ gpr_mu_unlock(&resource_user->mu);
+}
+
+void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ bool destructive, grpc_closure *closure) {
+ if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) {
+ GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
+ resource_user->reclaimers[destructive] = closure;
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->post_reclaimer_closure[destructive],
+ GRPC_ERROR_NONE, false);
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ }
+}
+
+void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user) {
+ if (grpc_resource_quota_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
+ resource_user->resource_quota->name, resource_user->name);
+ }
+ grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
+ &resource_user->resource_quota->bpreclaimation_done_closure,
+ GRPC_ERROR_NONE, false);
+}
+
+void grpc_resource_user_slice_allocator_init(
+ grpc_resource_user_slice_allocator *slice_allocator,
+ grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
+ grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices,
+ slice_allocator);
+ grpc_closure_init(&slice_allocator->on_done, cb, p);
+ slice_allocator->resource_user = resource_user;
+}
+
+void grpc_resource_user_alloc_slices(
+ grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator,
+ size_t length, size_t count, gpr_slice_buffer *dest) {
+ slice_allocator->length = length;
+ slice_allocator->count = count;
+ slice_allocator->dest = dest;
+ grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length,
+ &slice_allocator->on_allocated);
+}
diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/resource_quota.h
index 1564872b5d..5c566e492c 100644
--- a/src/core/lib/iomgr/buffer_pool.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -38,12 +38,13 @@
#include "src/core/lib/iomgr/exec_ctx.h"
-extern int grpc_buffer_pool_trace;
+extern int grpc_resource_quota_trace;
-grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool);
-void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_buffer_pool *buffer_pool);
-grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
+grpc_resource_quota *grpc_resource_quota_internal_ref(
+ grpc_resource_quota *resource_quota);
+void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_resource_quota *resource_quota);
+grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
typedef enum {
@@ -54,15 +55,15 @@ typedef enum {
GRPC_BULIST_COUNT
} grpc_bulist;
-typedef struct grpc_buffer_user grpc_buffer_user;
+typedef struct grpc_resource_user grpc_resource_user;
typedef struct {
- grpc_buffer_user *next;
- grpc_buffer_user *prev;
-} grpc_buffer_user_link;
+ grpc_resource_user *next;
+ grpc_resource_user *prev;
+} grpc_resource_user_link;
-struct grpc_buffer_user {
- grpc_buffer_pool *buffer_pool;
+struct grpc_resource_user {
+ grpc_resource_quota *resource_quota;
grpc_closure allocate_closure;
grpc_closure add_to_free_pool_closure;
@@ -84,45 +85,47 @@ struct grpc_buffer_user {
grpc_closure destroy_closure;
gpr_atm on_done_destroy_closure;
- grpc_buffer_user_link links[GRPC_BULIST_COUNT];
+ grpc_resource_user_link links[GRPC_BULIST_COUNT];
char *name;
};
-void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
- grpc_buffer_pool *buffer_pool, const char *name);
-void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user,
- grpc_closure *on_done);
-void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user);
-
-void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user, size_t size,
- grpc_closure *optional_on_done);
-void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user, size_t size);
-void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user,
- bool destructive, grpc_closure *closure);
-void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
- grpc_buffer_user *buffer_user);
-
-typedef struct grpc_buffer_user_slice_allocator {
+void grpc_resource_user_init(grpc_resource_user *resource_user,
+ grpc_resource_quota *resource_quota,
+ const char *name);
+void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ grpc_closure *on_done);
+void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user);
+
+void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user, size_t size,
+ grpc_closure *optional_on_done);
+void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user, size_t size);
+void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ bool destructive, grpc_closure *closure);
+void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user);
+
+typedef struct grpc_resource_user_slice_allocator {
grpc_closure on_allocated;
grpc_closure on_done;
size_t length;
size_t count;
gpr_slice_buffer *dest;
- grpc_buffer_user *buffer_user;
-} grpc_buffer_user_slice_allocator;
+ grpc_resource_user *resource_user;
+} grpc_resource_user_slice_allocator;
-void grpc_buffer_user_slice_allocator_init(
- grpc_buffer_user_slice_allocator *slice_allocator,
- grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p);
+void grpc_resource_user_slice_allocator_init(
+ grpc_resource_user_slice_allocator *slice_allocator,
+ grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p);
-void grpc_buffer_user_alloc_slices(
- grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
- size_t length, size_t count, gpr_slice_buffer *dest);
+void grpc_resource_user_alloc_slices(
+ grpc_exec_ctx *exec_ctx,
+ grpc_resource_user_slice_allocator *slice_allocator, size_t length,
+ size_t count, gpr_slice_buffer *dest);
#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index dadd4cc2eb..e74a696c2f 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
const char *addr_str) {
size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
- grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL);
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
@@ -135,16 +135,16 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
- grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
- buffer_pool = grpc_buffer_pool_internal_ref(
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
}
}
}
grpc_endpoint *ep =
- grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str);
- grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+ grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
return ep;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 648ca52818..27b6677545 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -102,8 +102,8 @@ typedef struct {
char *peer_string;
- grpc_buffer_user buffer_user;
- grpc_buffer_user_slice_allocator slice_allocator;
+ grpc_resource_user resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@@ -113,17 +113,17 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
-static void tcp_maybe_shutdown_buffer_user(grpc_exec_ctx *exec_ctx,
- grpc_tcp *tcp) {
+static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
+ grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) {
- grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user,
- grpc_closure_create(tcp_unref_closure, tcp));
+ grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
+ grpc_closure_create(tcp_unref_closure, tcp));
}
}
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
+ tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
}
@@ -131,7 +131,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer);
- grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user);
+ grpc_resource_user_destroy(exec_ctx, &tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -170,13 +170,13 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- TCP_UNREF(exec_ctx, arg, "buffer_user");
+ TCP_UNREF(exec_ctx, arg, "resource_user");
}
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
+ tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -286,7 +286,7 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- grpc_buffer_user_alloc_slices(
+ grpc_resource_user_alloc_slices(
exec_ctx, &tcp->slice_allocator, tcp->slice_size,
(size_t)tcp->iov_size - tcp->incoming_buffer->count,
tcp->incoming_buffer);
@@ -513,9 +513,9 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
return grpc_fd_get_workqueue(tcp->em_fd);
}
-static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) {
+static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- return &tcp->buffer_user;
+ return &tcp->resource_user;
}
static const grpc_endpoint_vtable vtable = {tcp_read,
@@ -525,10 +525,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset_set,
tcp_shutdown,
tcp_destroy,
- tcp_get_buffer_user,
+ tcp_get_resource_user,
tcp_get_peer};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
+grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
+ grpc_resource_quota *resource_quota,
size_t slice_size, const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
@@ -543,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy, and with the shutdown for our
- * buffer_user */
+ * resource_user */
gpr_ref_init(&tcp->refcount, 2);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
@@ -552,9 +553,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
- grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string);
- grpc_buffer_user_slice_allocator_init(
- &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp);
+ grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(&tcp->slice_allocator,
+ &tcp->resource_user,
+ tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -574,7 +576,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
- tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
+ tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index 768355cf0c..1c0d13f96e 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -53,7 +53,7 @@ extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
-grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool,
+grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota,
size_t read_slice_size, const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 2afce529ef..b2eb89f429 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -138,7 +138,7 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
- grpc_buffer_pool *buffer_pool;
+ grpc_resource_quota *resource_quota;
};
static gpr_once check_init = GPR_ONCE_INIT;
@@ -163,25 +163,25 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
- s->buffer_pool = grpc_buffer_pool_create(NULL);
+ s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
- grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
- s->buffer_pool =
- grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ s->resource_quota =
+ grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
} else {
- grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL
" must be a pointer to a buffer pool");
@@ -222,7 +222,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
- grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
}
@@ -440,7 +440,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, sp->server->buffer_pool,
+ grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor);