diff options
author | Craig Tiller <ctiller@google.com> | 2016-10-17 14:52:14 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-10-17 14:52:14 -0700 |
commit | 20afa3d7c933207c548ed11928c47b552b5b2f80 (patch) | |
tree | ea93d7458fb454dde9aca2083bf1fa89e69c189a /src/core | |
parent | 654b242ce70afcf9fdab674cab9b71d8d3f02502 (diff) |
BufferPool --> ResourceQuota
Diffstat (limited to 'src/core')
20 files changed, 828 insertions, 823 deletions
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index e5a156a59e..b760fea2fa 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -57,12 +57,12 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, char *name; gpr_asprintf(&name, "fd:%d", fd); - grpc_buffer_pool *buffer_pool = - grpc_buffer_pool_from_channel_args(grpc_server_get_channel_args(server)); + grpc_resource_quota *resource_quota = + grpc_resource_quota_from_channel_args(grpc_server_get_channel_args(server)); grpc_endpoint *server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name), buffer_pool, + grpc_tcp_create(grpc_fd_create(fd, name), resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); - grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index a7137bc944..612852cb16 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2115,9 +2115,9 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); - grpc_buffer_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep), false, - &t->benign_reclaimer); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + false, &t->benign_reclaimer); } } @@ -2126,9 +2126,9 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); - grpc_buffer_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep), true, - &t->destructive_reclaimer); + grpc_resource_user_post_reclaimer(exec_ctx, + grpc_endpoint_get_resource_user(t->ep), + true, &t->destructive_reclaimer); } } @@ -2151,13 +2151,13 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport *t = arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - if (grpc_buffer_pool_trace) { + if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", t->peer_string); } send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, gpr_slice_from_static_string("Buffers full")); - } else if (error == GRPC_ERROR_NONE && grpc_buffer_pool_trace) { + } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR " streams", @@ -2165,8 +2165,8 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, } t->benign_reclaimer_registered = false; if (error != GRPC_ERROR_CANCELLED) { - grpc_buffer_user_finish_reclaimation(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep)); + grpc_resource_user_finish_reclaimation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); } @@ -2178,7 +2178,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); - if (grpc_buffer_pool_trace) { + if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, s->id); } @@ -2191,8 +2191,8 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, } } if (error != GRPC_ERROR_CANCELLED) { - grpc_buffer_user_finish_reclaimation(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep)); + grpc_resource_user_finish_reclaimation( + exec_ctx, grpc_endpoint_get_resource_user(t->ep)); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); } diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index a4ce8a1e73..bdc18ac4bf 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -71,7 +71,7 @@ typedef struct { grpc_closure done_write; grpc_closure connected; grpc_error *overall_error; - grpc_buffer_pool *buffer_pool; + grpc_resource_quota *resource_quota; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -119,7 +119,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, gpr_slice_buffer_destroy(&req->incoming); gpr_slice_buffer_destroy(&req->outgoing); GRPC_ERROR_UNREF(req->overall_error); - grpc_buffer_pool_internal_unref(exec_ctx, req->buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, req->resource_quota); gpr_free(req); } @@ -229,8 +229,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_arg arg; arg.key = GRPC_ARG_BUFFER_POOL; arg.type = GRPC_ARG_POINTER; - arg.value.pointer.p = req->buffer_pool; - arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable(); + arg.value.pointer.p = req->resource_quota; + arg.value.pointer.vtable = grpc_resource_quota_arg_vtable(); grpc_channel_args args = {1, &arg}; grpc_tcp_client_connect( exec_ctx, &req->connected, &req->ep, req->context->pollset_set, &args, @@ -250,7 +250,7 @@ static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, - grpc_buffer_pool *buffer_pool, + grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_done, grpc_httpcli_response *response, @@ -266,7 +266,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, req->context = context; req->pollent = pollent; req->overall_error = GRPC_ERROR_NONE; - req->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); + req->resource_quota = grpc_resource_quota_internal_ref(resource_quota); grpc_closure_init(&req->on_read, on_read, req); grpc_closure_init(&req->done_write, done_write, req); gpr_slice_buffer_init(&req->incoming); @@ -284,7 +284,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, - grpc_buffer_pool *buffer_pool, + grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_done, grpc_httpcli_response *response) { @@ -294,7 +294,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, return; } gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path); - internal_request_begin(exec_ctx, context, pollent, buffer_pool, request, + internal_request_begin(exec_ctx, context, pollent, resource_quota, request, deadline, on_done, response, name, grpc_httpcli_format_get_request(request)); gpr_free(name); @@ -302,7 +302,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, - grpc_buffer_pool *buffer_pool, + grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_closure *on_done, @@ -315,7 +315,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, } gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path); internal_request_begin( - exec_ctx, context, pollent, buffer_pool, request, deadline, on_done, + exec_ctx, context, pollent, resource_quota, request, deadline, on_done, response, name, grpc_httpcli_format_post_request(request, body_bytes, body_size)); gpr_free(name); diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 0c053c1d70..11e03b44df 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -96,7 +96,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context); 'on_response' is a callback to report results to */ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, - grpc_buffer_pool *buffer_pool, + grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_complete, grpc_httpcli_response *response); @@ -117,7 +117,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, - grpc_buffer_pool *buffer_pool, + grpc_resource_quota *resource_quota, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_closure *on_complete, diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c deleted file mode 100644 index 8fbf75cbe4..0000000000 --- a/src/core/lib/iomgr/buffer_pool.c +++ /dev/null @@ -1,684 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/buffer_pool.h" - -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> - -#include "src/core/lib/iomgr/combiner.h" - -int grpc_buffer_pool_trace = 0; - -typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx, - grpc_buffer_pool *buffer_pool); - -typedef struct { - grpc_buffer_user *head; - grpc_buffer_user *tail; -} grpc_buffer_user_list; - -struct grpc_buffer_pool { - gpr_refcount refs; - - grpc_combiner *combiner; - int64_t size; - int64_t free_pool; - - bool step_scheduled; - bool reclaiming; - grpc_closure bpstep_closure; - grpc_closure bpreclaimation_done_closure; - - grpc_buffer_user *roots[GRPC_BULIST_COUNT]; - - char *name; -}; - -/******************************************************************************* - * list management - */ - -static void bulist_add_tail(grpc_buffer_user *buffer_user, grpc_bulist list) { - grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; - grpc_buffer_user **root = &buffer_pool->roots[list]; - if (*root == NULL) { - *root = buffer_user; - buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user; - } else { - buffer_user->links[list].next = *root; - buffer_user->links[list].prev = (*root)->links[list].prev; - buffer_user->links[list].next->links[list].prev = - buffer_user->links[list].prev->links[list].next = buffer_user; - } -} - -static void bulist_add_head(grpc_buffer_user *buffer_user, grpc_bulist list) { - grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; - grpc_buffer_user **root = &buffer_pool->roots[list]; - if (*root == NULL) { - *root = buffer_user; - buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user; - } else { - buffer_user->links[list].next = (*root)->links[list].next; - buffer_user->links[list].prev = *root; - buffer_user->links[list].next->links[list].prev = - buffer_user->links[list].prev->links[list].next = buffer_user; - *root = buffer_user; - } -} - -static bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list) { - return buffer_pool->roots[list] == NULL; -} - -static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool, - grpc_bulist list) { - grpc_buffer_user **root = &buffer_pool->roots[list]; - grpc_buffer_user *buffer_user = *root; - if (buffer_user == NULL) { - return NULL; - } - if (buffer_user->links[list].next == buffer_user) { - *root = NULL; - } else { - buffer_user->links[list].next->links[list].prev = - buffer_user->links[list].prev; - buffer_user->links[list].prev->links[list].next = - buffer_user->links[list].next; - *root = buffer_user->links[list].next; - } - buffer_user->links[list].next = buffer_user->links[list].prev = NULL; - return buffer_user; -} - -static void bulist_remove(grpc_buffer_user *buffer_user, grpc_bulist list) { - if (buffer_user->links[list].next == NULL) return; - grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; - if (buffer_pool->roots[list] == buffer_user) { - buffer_pool->roots[list] = buffer_user->links[list].next; - if (buffer_pool->roots[list] == buffer_user) { - buffer_pool->roots[list] = NULL; - } - } - buffer_user->links[list].next->links[list].prev = - buffer_user->links[list].prev; - buffer_user->links[list].prev->links[list].next = - buffer_user->links[list].next; -} - -/******************************************************************************* - * buffer pool state machine - */ - -static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); -static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); -static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, - bool destructive); - -static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { - grpc_buffer_pool *buffer_pool = bp; - buffer_pool->step_scheduled = false; - do { - if (bpalloc(exec_ctx, buffer_pool)) goto done; - } while (bpscavenge(exec_ctx, buffer_pool)); - bpreclaim(exec_ctx, buffer_pool, false) || - bpreclaim(exec_ctx, buffer_pool, true); -done: - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); -} - -static void bpstep_sched(grpc_exec_ctx *exec_ctx, - grpc_buffer_pool *buffer_pool) { - if (buffer_pool->step_scheduled) return; - buffer_pool->step_scheduled = true; - grpc_buffer_pool_internal_ref(buffer_pool); - grpc_combiner_execute_finally(exec_ctx, buffer_pool->combiner, - &buffer_pool->bpstep_closure, GRPC_ERROR_NONE, - false); -} - -/* returns true if all allocations are completed */ -static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { - grpc_buffer_user *buffer_user; - while ((buffer_user = - bulist_pop(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) { - gpr_mu_lock(&buffer_user->mu); - if (buffer_user->free_pool < 0 && - -buffer_user->free_pool <= buffer_pool->free_pool) { - int64_t amt = -buffer_user->free_pool; - buffer_user->free_pool = 0; - buffer_pool->free_pool -= amt; - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 - " bytes; bp_free_pool -> %" PRId64, - buffer_pool->name, buffer_user->name, amt, - buffer_pool->free_pool); - } - } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) { - gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", - buffer_pool->name, buffer_user->name); - } - if (buffer_user->free_pool >= 0) { - buffer_user->allocating = false; - grpc_exec_ctx_enqueue_list(exec_ctx, &buffer_user->on_allocated, NULL); - gpr_mu_unlock(&buffer_user->mu); - } else { - bulist_add_head(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); - gpr_mu_unlock(&buffer_user->mu); - return false; - } - } - return true; -} - -/* returns true if any memory could be reclaimed from buffers */ -static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { - grpc_buffer_user *buffer_user; - while ((buffer_user = - bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { - gpr_mu_lock(&buffer_user->mu); - if (buffer_user->free_pool > 0) { - int64_t amt = buffer_user->free_pool; - buffer_user->free_pool = 0; - buffer_pool->free_pool += amt; - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 - " bytes; bp_free_pool -> %" PRId64, - buffer_pool->name, buffer_user->name, amt, - buffer_pool->free_pool); - } - gpr_mu_unlock(&buffer_user->mu); - return true; - } else { - gpr_mu_unlock(&buffer_user->mu); - } - } - return false; -} - -/* returns true if reclaimation is proceeding */ -static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, - bool destructive) { - if (buffer_pool->reclaiming) return true; - grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE - : GRPC_BULIST_RECLAIMER_BENIGN; - grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list); - if (buffer_user == NULL) return false; - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name, - buffer_user->name, destructive ? "destructive" : "benign"); - } - buffer_pool->reclaiming = true; - grpc_buffer_pool_internal_ref(buffer_pool); - grpc_closure *c = buffer_user->reclaimers[destructive]; - buffer_user->reclaimers[destructive] = NULL; - grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); - return true; -} - -/******************************************************************************* - * bu_slice: a slice implementation that is backed by a grpc_buffer_user - */ - -typedef struct { - gpr_slice_refcount base; - gpr_refcount refs; - grpc_buffer_user *buffer_user; - size_t size; -} bu_slice_refcount; - -static void bu_slice_ref(void *p) { - bu_slice_refcount *rc = p; - gpr_ref(&rc->refs); -} - -static void bu_slice_unref(void *p) { - bu_slice_refcount *rc = p; - if (gpr_unref(&rc->refs)) { - /* TODO(ctiller): this is dangerous, but I think safe for now: - we have no guarantee here that we're at a safe point for creating an - execution context, but we have no way of writing this code otherwise. - In the future: consider lifting gpr_slice to grpc, and offering an - internal_{ref,unref} pair that is execution context aware. Alternatively, - make exec_ctx be thread local and 'do the right thing' (whatever that is) - if NULL */ - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(rc); - } -} - -static gpr_slice bu_slice_create(grpc_buffer_user *buffer_user, size_t size) { - bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size); - rc->base.ref = bu_slice_ref; - rc->base.unref = bu_slice_unref; - gpr_ref_init(&rc->refs, 1); - rc->buffer_user = buffer_user; - rc->size = size; - gpr_slice slice; - slice.refcount = &rc->base; - slice.data.refcounted.bytes = (uint8_t *)(rc + 1); - slice.data.refcounted.length = size; - return slice; -} - -/******************************************************************************* - * grpc_buffer_pool internal implementation - */ - -static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { - grpc_buffer_user *buffer_user = bu; - if (bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) { - bpstep_sched(exec_ctx, buffer_user->buffer_pool); - } - bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); -} - -static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, - grpc_error *error) { - grpc_buffer_user *buffer_user = bu; - if (!bulist_empty(buffer_user->buffer_pool, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL)) { - bpstep_sched(exec_ctx, buffer_user->buffer_pool); - } - bulist_add_tail(buffer_user, GRPC_BULIST_NON_EMPTY_FREE_POOL); -} - -static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, - grpc_error *error) { - grpc_buffer_user *buffer_user = bu; - if (!bulist_empty(buffer_user->buffer_pool, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) && - bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN)) { - bpstep_sched(exec_ctx, buffer_user->buffer_pool); - } - bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN); -} - -static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, - grpc_error *error) { - grpc_buffer_user *buffer_user = bu; - if (!bulist_empty(buffer_user->buffer_pool, - GRPC_BULIST_AWAITING_ALLOCATION) && - bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) && - bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN) && - bulist_empty(buffer_user->buffer_pool, - GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) { - bpstep_sched(exec_ctx, buffer_user->buffer_pool); - } - bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); -} - -static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { - grpc_buffer_user *buffer_user = bu; - GPR_ASSERT(buffer_user->allocated == 0); - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { - bulist_remove(buffer_user, (grpc_bulist)i); - } - grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[0], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[1], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( - &buffer_user->on_done_destroy_closure), - GRPC_ERROR_NONE, NULL); - if (buffer_user->free_pool != 0) { - buffer_user->buffer_pool->free_pool += buffer_user->free_pool; - bpstep_sched(exec_ctx, buffer_user->buffer_pool); - } -} - -static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, - grpc_error *error) { - grpc_buffer_user_slice_allocator *slice_allocator = ts; - if (error == GRPC_ERROR_NONE) { - for (size_t i = 0; i < slice_allocator->count; i++) { - gpr_slice_buffer_add_indexed(slice_allocator->dest, - bu_slice_create(slice_allocator->buffer_user, - slice_allocator->length)); - } - } - grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); -} - -typedef struct { - int64_t size; - grpc_buffer_pool *buffer_pool; - grpc_closure closure; -} bp_resize_args; - -static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - bp_resize_args *a = args; - int64_t delta = a->size - a->buffer_pool->size; - a->buffer_pool->size += delta; - a->buffer_pool->free_pool += delta; - if (delta < 0 && a->buffer_pool->free_pool < 0) { - bpstep_sched(exec_ctx, a->buffer_pool); - } else if (delta > 0 && - !bulist_empty(a->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) { - bpstep_sched(exec_ctx, a->buffer_pool); - } - grpc_buffer_pool_internal_unref(exec_ctx, a->buffer_pool); - gpr_free(a); -} - -static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, - grpc_error *error) { - grpc_buffer_pool *buffer_pool = bp; - buffer_pool->reclaiming = false; - bpstep_sched(exec_ctx, buffer_pool); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); -} - -/******************************************************************************* - * grpc_buffer_pool api - */ - -grpc_buffer_pool *grpc_buffer_pool_create(const char *name) { - grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool)); - gpr_ref_init(&buffer_pool->refs, 1); - buffer_pool->combiner = grpc_combiner_create(NULL); - buffer_pool->free_pool = INT64_MAX; - buffer_pool->size = INT64_MAX; - buffer_pool->step_scheduled = false; - buffer_pool->reclaiming = false; - if (name != NULL) { - buffer_pool->name = gpr_strdup(name); - } else { - gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR, - (intptr_t)buffer_pool); - } - grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); - grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, - bp_reclaimation_done, buffer_pool); - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { - buffer_pool->roots[i] = NULL; - } - return buffer_pool; -} - -void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_buffer_pool *buffer_pool) { - if (gpr_unref(&buffer_pool->refs)) { - grpc_combiner_destroy(exec_ctx, buffer_pool->combiner); - gpr_free(buffer_pool->name); - gpr_free(buffer_pool); - } -} - -void grpc_buffer_pool_unref(grpc_buffer_pool *buffer_pool) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); - grpc_exec_ctx_finish(&exec_ctx); -} - -grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool) { - gpr_ref(&buffer_pool->refs); - return buffer_pool; -} - -void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool) { - grpc_buffer_pool_internal_ref(buffer_pool); -} - -void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - bp_resize_args *a = gpr_malloc(sizeof(*a)); - a->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); - a->size = (int64_t)size; - grpc_closure_init(&a->closure, bp_resize, a); - grpc_combiner_execute(&exec_ctx, buffer_pool->combiner, &a->closure, - GRPC_ERROR_NONE, false); - grpc_exec_ctx_finish(&exec_ctx); -} - -/******************************************************************************* - * grpc_buffer_user channel args api - */ - -grpc_buffer_pool *grpc_buffer_pool_from_channel_args( - const grpc_channel_args *channel_args) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { - if (channel_args->args[i].type == GRPC_ARG_POINTER) { - return grpc_buffer_pool_internal_ref( - channel_args->args[i].value.pointer.p); - } else { - gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer"); - } - } - } - return grpc_buffer_pool_create(NULL); -} - -static void *bp_copy(void *bp) { - grpc_buffer_pool_ref(bp); - return bp; -} - -static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); } - -static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); } - -const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) { - static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp}; - return &vtable; -} - -/******************************************************************************* - * grpc_buffer_user api - */ - -void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool, const char *name) { - buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); - grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user); - grpc_closure_init(&buffer_user->add_to_free_pool_closure, - &bu_add_to_free_pool, buffer_user); - grpc_closure_init(&buffer_user->post_reclaimer_closure[0], - &bu_post_benign_reclaimer, buffer_user); - grpc_closure_init(&buffer_user->post_reclaimer_closure[1], - &bu_post_destructive_reclaimer, buffer_user); - grpc_closure_init(&buffer_user->destroy_closure, &bu_destroy, buffer_user); - gpr_mu_init(&buffer_user->mu); - buffer_user->allocated = 0; - buffer_user->free_pool = 0; - grpc_closure_list_init(&buffer_user->on_allocated); - buffer_user->allocating = false; - buffer_user->added_to_free_pool = false; - gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, 0); - buffer_user->reclaimers[0] = NULL; - buffer_user->reclaimers[1] = NULL; - for (int i = 0; i < GRPC_BULIST_COUNT; i++) { - buffer_user->links[i].next = buffer_user->links[i].prev = NULL; - } -#ifndef NDEBUG - buffer_user->asan_canary = gpr_malloc(1); -#endif - if (name != NULL) { - buffer_user->name = gpr_strdup(name); - } else { - gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR, - (intptr_t)buffer_user); - } -} - -void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - grpc_closure *on_done) { - gpr_mu_lock(&buffer_user->mu); - GPR_ASSERT(gpr_atm_no_barrier_load(&buffer_user->on_done_destroy_closure) == - 0); - gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, - (gpr_atm)on_done); - if (buffer_user->allocated == 0) { - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->destroy_closure, GRPC_ERROR_NONE, - false); - } - gpr_mu_unlock(&buffer_user->mu); -} - -void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user) { -#ifndef NDEBUG - gpr_free(buffer_user->asan_canary); -#endif - grpc_buffer_pool_internal_unref(exec_ctx, buffer_user->buffer_pool); - gpr_mu_destroy(&buffer_user->mu); - gpr_free(buffer_user->name); -} - -void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, size_t size, - grpc_closure *optional_on_done) { - gpr_mu_lock(&buffer_user->mu); - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &buffer_user->on_done_destroy_closure); - if (on_done_destroy != NULL) { - /* already shutdown */ - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", - buffer_user->buffer_pool->name, buffer_user->name, size); - } - grpc_exec_ctx_sched( - exec_ctx, optional_on_done, - GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); - gpr_mu_unlock(&buffer_user->mu); - return; - } - buffer_user->allocated += (int64_t)size; - buffer_user->free_pool -= (int64_t)size; - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, - buffer_user->buffer_pool->name, buffer_user->name, size, - buffer_user->allocated, buffer_user->free_pool); - } - if (buffer_user->free_pool < 0) { - grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done, - GRPC_ERROR_NONE); - if (!buffer_user->allocating) { - buffer_user->allocating = true; - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->allocate_closure, GRPC_ERROR_NONE, - false); - } - } else { - grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); - } - gpr_mu_unlock(&buffer_user->mu); -} - -void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, size_t size) { - gpr_mu_lock(&buffer_user->mu); - GPR_ASSERT(buffer_user->allocated >= (int64_t)size); - bool was_zero_or_negative = buffer_user->free_pool <= 0; - buffer_user->free_pool += (int64_t)size; - buffer_user->allocated -= (int64_t)size; - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, - buffer_user->buffer_pool->name, buffer_user->name, size, - buffer_user->allocated, buffer_user->free_pool); - } - bool is_bigger_than_zero = buffer_user->free_pool > 0; - if (is_bigger_than_zero && was_zero_or_negative && - !buffer_user->added_to_free_pool) { - buffer_user->added_to_free_pool = true; - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->add_to_free_pool_closure, - GRPC_ERROR_NONE, false); - } - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &buffer_user->on_done_destroy_closure); - if (on_done_destroy != NULL && buffer_user->allocated == 0) { - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->destroy_closure, GRPC_ERROR_NONE, - false); - } - gpr_mu_unlock(&buffer_user->mu); -} - -void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - bool destructive, grpc_closure *closure) { - if (gpr_atm_acq_load(&buffer_user->on_done_destroy_closure) == 0) { - GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL); - buffer_user->reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); - } else { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); - } -} - -void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user) { - if (grpc_buffer_pool_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", - buffer_user->buffer_pool->name, buffer_user->name); - } - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->buffer_pool->bpreclaimation_done_closure, - GRPC_ERROR_NONE, false); -} - -void grpc_buffer_user_slice_allocator_init( - grpc_buffer_user_slice_allocator *slice_allocator, - grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) { - grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices, - slice_allocator); - grpc_closure_init(&slice_allocator->on_done, cb, p); - slice_allocator->buffer_user = buffer_user; -} - -void grpc_buffer_user_alloc_slices( - grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, - size_t length, size_t count, gpr_slice_buffer *dest) { - slice_allocator->length = length; - slice_allocator->count = count; - slice_allocator->dest = dest; - grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length, - &slice_allocator->on_allocated); -} diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index f3548a1d74..74fa9c45df 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -70,6 +70,6 @@ grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { return ep->vtable->get_workqueue(ep); } -grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) { - return ep->vtable->get_buffer_user(ep); +grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) { + return ep->vtable->get_resource_user(ep); } diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index df6b899e39..819fcdda1a 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -37,7 +37,7 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> #include <grpc/support/time.h> -#include "src/core/lib/iomgr/buffer_pool.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -59,7 +59,7 @@ struct grpc_endpoint_vtable { grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); - grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep); + grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); }; @@ -102,7 +102,7 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); -grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint); +grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index 4938cf8599..27c17a0e97 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -42,6 +42,6 @@ typedef struct { } grpc_endpoint_pair; grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size); + const char *name, grpc_resource_quota *resource_quota, size_t read_slice_size); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index 64c161675f..c1437bcf17 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -63,18 +63,18 @@ static void create_sockets(int sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) { + const char *name, grpc_resource_quota *resource_quota, size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota, read_slice_size, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota, read_slice_size, "socketpair-client"); gpr_free(final_name); return p; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c new file mode 100644 index 0000000000..c4e6e5482a --- /dev/null +++ b/src/core/lib/iomgr/resource_quota.c @@ -0,0 +1,684 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/resource_quota.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/combiner.h" + +int grpc_resource_quota_trace = 0; + +typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota); + +typedef struct { + grpc_resource_user *head; + grpc_resource_user *tail; +} grpc_resource_user_list; + +struct grpc_resource_quota { + gpr_refcount refs; + + grpc_combiner *combiner; + int64_t size; + int64_t free_pool; + + bool step_scheduled; + bool reclaiming; + grpc_closure bpstep_closure; + grpc_closure bpreclaimation_done_closure; + + grpc_resource_user *roots[GRPC_BULIST_COUNT]; + + char *name; +}; + +/******************************************************************************* + * list management + */ + +static void bulist_add_tail(grpc_resource_user *resource_user, grpc_bulist list) { + grpc_resource_quota *resource_quota = resource_user->resource_quota; + grpc_resource_user **root = &resource_quota->roots[list]; + if (*root == NULL) { + *root = resource_user; + resource_user->links[list].next = resource_user->links[list].prev = resource_user; + } else { + resource_user->links[list].next = *root; + resource_user->links[list].prev = (*root)->links[list].prev; + resource_user->links[list].next->links[list].prev = + resource_user->links[list].prev->links[list].next = resource_user; + } +} + +static void bulist_add_head(grpc_resource_user *resource_user, grpc_bulist list) { + grpc_resource_quota *resource_quota = resource_user->resource_quota; + grpc_resource_user **root = &resource_quota->roots[list]; + if (*root == NULL) { + *root = resource_user; + resource_user->links[list].next = resource_user->links[list].prev = resource_user; + } else { + resource_user->links[list].next = (*root)->links[list].next; + resource_user->links[list].prev = *root; + resource_user->links[list].next->links[list].prev = + resource_user->links[list].prev->links[list].next = resource_user; + *root = resource_user; + } +} + +static bool bulist_empty(grpc_resource_quota *resource_quota, grpc_bulist list) { + return resource_quota->roots[list] == NULL; +} + +static grpc_resource_user *bulist_pop(grpc_resource_quota *resource_quota, + grpc_bulist list) { + grpc_resource_user **root = &resource_quota->roots[list]; + grpc_resource_user *resource_user = *root; + if (resource_user == NULL) { + return NULL; + } + if (resource_user->links[list].next == resource_user) { + *root = NULL; + } else { + resource_user->links[list].next->links[list].prev = + resource_user->links[list].prev; + resource_user->links[list].prev->links[list].next = + resource_user->links[list].next; + *root = resource_user->links[list].next; + } + resource_user->links[list].next = resource_user->links[list].prev = NULL; + return resource_user; +} + +static void bulist_remove(grpc_resource_user *resource_user, grpc_bulist list) { + if (resource_user->links[list].next == NULL) return; + grpc_resource_quota *resource_quota = resource_user->resource_quota; + if (resource_quota->roots[list] == resource_user) { + resource_quota->roots[list] = resource_user->links[list].next; + if (resource_quota->roots[list] == resource_user) { + resource_quota->roots[list] = NULL; + } + } + resource_user->links[list].next->links[list].prev = + resource_user->links[list].prev; + resource_user->links[list].prev->links[list].next = + resource_user->links[list].next; +} + +/******************************************************************************* + * buffer pool state machine + */ + +static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota); +static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota); +static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota, + bool destructive); + +static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { + grpc_resource_quota *resource_quota = bp; + resource_quota->step_scheduled = false; + do { + if (bpalloc(exec_ctx, resource_quota)) goto done; + } while (bpscavenge(exec_ctx, resource_quota)); + bpreclaim(exec_ctx, resource_quota, false) || + bpreclaim(exec_ctx, resource_quota, true); +done: + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); +} + +static void bpstep_sched(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota) { + if (resource_quota->step_scheduled) return; + resource_quota->step_scheduled = true; + grpc_resource_quota_internal_ref(resource_quota); + grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner, + &resource_quota->bpstep_closure, GRPC_ERROR_NONE, + false); +} + +/* returns true if all allocations are completed */ +static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { + grpc_resource_user *resource_user; + while ((resource_user = + bulist_pop(resource_quota, GRPC_BULIST_AWAITING_ALLOCATION))) { + gpr_mu_lock(&resource_user->mu); + if (resource_user->free_pool < 0 && + -resource_user->free_pool <= resource_quota->free_pool) { + int64_t amt = -resource_user->free_pool; + resource_user->free_pool = 0; + resource_quota->free_pool -= amt; + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + resource_quota->name, resource_user->name, amt, + resource_quota->free_pool); + } + } else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) { + gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", + resource_quota->name, resource_user->name); + } + if (resource_user->free_pool >= 0) { + resource_user->allocating = false; + grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); + gpr_mu_unlock(&resource_user->mu); + } else { + bulist_add_head(resource_user, GRPC_BULIST_AWAITING_ALLOCATION); + gpr_mu_unlock(&resource_user->mu); + return false; + } + } + return true; +} + +/* returns true if any memory could be reclaimed from buffers */ +static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { + grpc_resource_user *resource_user; + while ((resource_user = + bulist_pop(resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { + gpr_mu_lock(&resource_user->mu); + if (resource_user->free_pool > 0) { + int64_t amt = resource_user->free_pool; + resource_user->free_pool = 0; + resource_quota->free_pool += amt; + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + resource_quota->name, resource_user->name, amt, + resource_quota->free_pool); + } + gpr_mu_unlock(&resource_user->mu); + return true; + } else { + gpr_mu_unlock(&resource_user->mu); + } + } + return false; +} + +/* returns true if reclaimation is proceeding */ +static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota, + bool destructive) { + if (resource_quota->reclaiming) return true; + grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE + : GRPC_BULIST_RECLAIMER_BENIGN; + grpc_resource_user *resource_user = bulist_pop(resource_quota, list); + if (resource_user == NULL) return false; + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", resource_quota->name, + resource_user->name, destructive ? "destructive" : "benign"); + } + resource_quota->reclaiming = true; + grpc_resource_quota_internal_ref(resource_quota); + grpc_closure *c = resource_user->reclaimers[destructive]; + resource_user->reclaimers[destructive] = NULL; + grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); + return true; +} + +/******************************************************************************* + * bu_slice: a slice implementation that is backed by a grpc_resource_user + */ + +typedef struct { + gpr_slice_refcount base; + gpr_refcount refs; + grpc_resource_user *resource_user; + size_t size; +} bu_slice_refcount; + +static void bu_slice_ref(void *p) { + bu_slice_refcount *rc = p; + gpr_ref(&rc->refs); +} + +static void bu_slice_unref(void *p) { + bu_slice_refcount *rc = p; + if (gpr_unref(&rc->refs)) { + /* TODO(ctiller): this is dangerous, but I think safe for now: + we have no guarantee here that we're at a safe point for creating an + execution context, but we have no way of writing this code otherwise. + In the future: consider lifting gpr_slice to grpc, and offering an + internal_{ref,unref} pair that is execution context aware. Alternatively, + make exec_ctx be thread local and 'do the right thing' (whatever that is) + if NULL */ + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(rc); + } +} + +static gpr_slice bu_slice_create(grpc_resource_user *resource_user, size_t size) { + bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size); + rc->base.ref = bu_slice_ref; + rc->base.unref = bu_slice_unref; + gpr_ref_init(&rc->refs, 1); + rc->resource_user = resource_user; + rc->size = size; + gpr_slice slice; + slice.refcount = &rc->base; + slice.data.refcounted.bytes = (uint8_t *)(rc + 1); + slice.data.refcounted.length = size; + return slice; +} + +/******************************************************************************* + * grpc_resource_quota internal implementation + */ + +static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { + grpc_resource_user *resource_user = bu; + if (bulist_empty(resource_user->resource_quota, GRPC_BULIST_AWAITING_ALLOCATION)) { + bpstep_sched(exec_ctx, resource_user->resource_quota); + } + bulist_add_tail(resource_user, GRPC_BULIST_AWAITING_ALLOCATION); +} + +static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_resource_user *resource_user = bu; + if (!bulist_empty(resource_user->resource_quota, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL)) { + bpstep_sched(exec_ctx, resource_user->resource_quota); + } + bulist_add_tail(resource_user, GRPC_BULIST_NON_EMPTY_FREE_POOL); +} + +static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_resource_user *resource_user = bu; + if (!bulist_empty(resource_user->resource_quota, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL) && + bulist_empty(resource_user->resource_quota, GRPC_BULIST_RECLAIMER_BENIGN)) { + bpstep_sched(exec_ctx, resource_user->resource_quota); + } + bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_BENIGN); +} + +static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_resource_user *resource_user = bu; + if (!bulist_empty(resource_user->resource_quota, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(resource_user->resource_quota, GRPC_BULIST_NON_EMPTY_FREE_POOL) && + bulist_empty(resource_user->resource_quota, GRPC_BULIST_RECLAIMER_BENIGN) && + bulist_empty(resource_user->resource_quota, + GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) { + bpstep_sched(exec_ctx, resource_user->resource_quota); + } + bulist_add_tail(resource_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); +} + +static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { + grpc_resource_user *resource_user = bu; + GPR_ASSERT(resource_user->allocated == 0); + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + bulist_remove(resource_user, (grpc_bulist)i); + } + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( + &resource_user->on_done_destroy_closure), + GRPC_ERROR_NONE, NULL); + if (resource_user->free_pool != 0) { + resource_user->resource_quota->free_pool += resource_user->free_pool; + bpstep_sched(exec_ctx, resource_user->resource_quota); + } +} + +static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, + grpc_error *error) { + grpc_resource_user_slice_allocator *slice_allocator = ts; + if (error == GRPC_ERROR_NONE) { + for (size_t i = 0; i < slice_allocator->count; i++) { + gpr_slice_buffer_add_indexed(slice_allocator->dest, + bu_slice_create(slice_allocator->resource_user, + slice_allocator->length)); + } + } + grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); +} + +typedef struct { + int64_t size; + grpc_resource_quota *resource_quota; + grpc_closure closure; +} bp_resize_args; + +static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { + bp_resize_args *a = args; + int64_t delta = a->size - a->resource_quota->size; + a->resource_quota->size += delta; + a->resource_quota->free_pool += delta; + if (delta < 0 && a->resource_quota->free_pool < 0) { + bpstep_sched(exec_ctx, a->resource_quota); + } else if (delta > 0 && + !bulist_empty(a->resource_quota, GRPC_BULIST_AWAITING_ALLOCATION)) { + bpstep_sched(exec_ctx, a->resource_quota); + } + grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota); + gpr_free(a); +} + +static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error) { + grpc_resource_quota *resource_quota = bp; + resource_quota->reclaiming = false; + bpstep_sched(exec_ctx, resource_quota); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); +} + +/******************************************************************************* + * grpc_resource_quota api + */ + +grpc_resource_quota *grpc_resource_quota_create(const char *name) { + grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); + gpr_ref_init(&resource_quota->refs, 1); + resource_quota->combiner = grpc_combiner_create(NULL); + resource_quota->free_pool = INT64_MAX; + resource_quota->size = INT64_MAX; + resource_quota->step_scheduled = false; + resource_quota->reclaiming = false; + if (name != NULL) { + resource_quota->name = gpr_strdup(name); + } else { + gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, + (intptr_t)resource_quota); + } + grpc_closure_init(&resource_quota->bpstep_closure, bpstep, resource_quota); + grpc_closure_init(&resource_quota->bpreclaimation_done_closure, + bp_reclaimation_done, resource_quota); + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + resource_quota->roots[i] = NULL; + } + return resource_quota; +} + +void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota) { + if (gpr_unref(&resource_quota->refs)) { + grpc_combiner_destroy(exec_ctx, resource_quota->combiner); + gpr_free(resource_quota->name); + gpr_free(resource_quota); + } +} + +void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); + grpc_exec_ctx_finish(&exec_ctx); +} + +grpc_resource_quota *grpc_resource_quota_internal_ref(grpc_resource_quota *resource_quota) { + gpr_ref(&resource_quota->refs); + return resource_quota; +} + +void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { + grpc_resource_quota_internal_ref(resource_quota); +} + +void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + bp_resize_args *a = gpr_malloc(sizeof(*a)); + a->resource_quota = grpc_resource_quota_internal_ref(resource_quota); + a->size = (int64_t)size; + grpc_closure_init(&a->closure, bp_resize, a); + grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure, + GRPC_ERROR_NONE, false); + grpc_exec_ctx_finish(&exec_ctx); +} + +/******************************************************************************* + * grpc_resource_user channel args api + */ + +grpc_resource_quota *grpc_resource_quota_from_channel_args( + const grpc_channel_args *channel_args) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + if (channel_args->args[i].type == GRPC_ARG_POINTER) { + return grpc_resource_quota_internal_ref( + channel_args->args[i].value.pointer.p); + } else { + gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer"); + } + } + } + return grpc_resource_quota_create(NULL); +} + +static void *bp_copy(void *bp) { + grpc_resource_quota_ref(bp); + return bp; +} + +static void bp_destroy(void *bp) { grpc_resource_quota_unref(bp); } + +static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); } + +const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { + static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp}; + return &vtable; +} + +/******************************************************************************* + * grpc_resource_user api + */ + +void grpc_resource_user_init(grpc_resource_user *resource_user, + grpc_resource_quota *resource_quota, const char *name) { + resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota); + grpc_closure_init(&resource_user->allocate_closure, &bu_allocate, resource_user); + grpc_closure_init(&resource_user->add_to_free_pool_closure, + &bu_add_to_free_pool, resource_user); + grpc_closure_init(&resource_user->post_reclaimer_closure[0], + &bu_post_benign_reclaimer, resource_user); + grpc_closure_init(&resource_user->post_reclaimer_closure[1], + &bu_post_destructive_reclaimer, resource_user); + grpc_closure_init(&resource_user->destroy_closure, &bu_destroy, resource_user); + gpr_mu_init(&resource_user->mu); + resource_user->allocated = 0; + resource_user->free_pool = 0; + grpc_closure_list_init(&resource_user->on_allocated); + resource_user->allocating = false; + resource_user->added_to_free_pool = false; + gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0); + resource_user->reclaimers[0] = NULL; + resource_user->reclaimers[1] = NULL; + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + resource_user->links[i].next = resource_user->links[i].prev = NULL; + } +#ifndef NDEBUG + resource_user->asan_canary = gpr_malloc(1); +#endif + if (name != NULL) { + resource_user->name = gpr_strdup(name); + } else { + gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR, + (intptr_t)resource_user); + } +} + +void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + grpc_closure *on_done) { + gpr_mu_lock(&resource_user->mu); + GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) == + 0); + gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, + (gpr_atm)on_done); + if (resource_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->destroy_closure, GRPC_ERROR_NONE, + false); + } + gpr_mu_unlock(&resource_user->mu); +} + +void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { +#ifndef NDEBUG + gpr_free(resource_user->asan_canary); +#endif + grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota); + gpr_mu_destroy(&resource_user->mu); + gpr_free(resource_user->name); +} + +void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, size_t size, + grpc_closure *optional_on_done) { + gpr_mu_lock(&resource_user->mu); + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &resource_user->on_done_destroy_closure); + if (on_done_destroy != NULL) { + /* already shutdown */ + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", + resource_user->resource_quota->name, resource_user->name, size); + } + grpc_exec_ctx_sched( + exec_ctx, optional_on_done, + GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); + gpr_mu_unlock(&resource_user->mu); + return; + } + resource_user->allocated += (int64_t)size; + resource_user->free_pool -= (int64_t)size; + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + resource_user->resource_quota->name, resource_user->name, size, + resource_user->allocated, resource_user->free_pool); + } + if (resource_user->free_pool < 0) { + grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, + GRPC_ERROR_NONE); + if (!resource_user->allocating) { + resource_user->allocating = true; + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->allocate_closure, GRPC_ERROR_NONE, + false); + } + } else { + grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); + } + gpr_mu_unlock(&resource_user->mu); +} + +void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, size_t size) { + gpr_mu_lock(&resource_user->mu); + GPR_ASSERT(resource_user->allocated >= (int64_t)size); + bool was_zero_or_negative = resource_user->free_pool <= 0; + resource_user->free_pool += (int64_t)size; + resource_user->allocated -= (int64_t)size; + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + resource_user->resource_quota->name, resource_user->name, size, + resource_user->allocated, resource_user->free_pool); + } + bool is_bigger_than_zero = resource_user->free_pool > 0; + if (is_bigger_than_zero && was_zero_or_negative && + !resource_user->added_to_free_pool) { + resource_user->added_to_free_pool = true; + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->add_to_free_pool_closure, + GRPC_ERROR_NONE, false); + } + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &resource_user->on_done_destroy_closure); + if (on_done_destroy != NULL && resource_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->destroy_closure, GRPC_ERROR_NONE, + false); + } + gpr_mu_unlock(&resource_user->mu); +} + +void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + bool destructive, grpc_closure *closure) { + if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) { + GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); + resource_user->reclaimers[destructive] = closure; + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE, false); + } else { + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + } +} + +void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + if (grpc_resource_quota_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", + resource_user->resource_quota->name, resource_user->name); + } + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->resource_quota->bpreclaimation_done_closure, + GRPC_ERROR_NONE, false); +} + +void grpc_resource_user_slice_allocator_init( + grpc_resource_user_slice_allocator *slice_allocator, + grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { + grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices, + slice_allocator); + grpc_closure_init(&slice_allocator->on_done, cb, p); + slice_allocator->resource_user = resource_user; +} + +void grpc_resource_user_alloc_slices( + grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator, + size_t length, size_t count, gpr_slice_buffer *dest) { + slice_allocator->length = length; + slice_allocator->count = count; + slice_allocator->dest = dest; + grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, + &slice_allocator->on_allocated); +} diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/resource_quota.h index 1564872b5d..5c566e492c 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -38,12 +38,13 @@ #include "src/core/lib/iomgr/exec_ctx.h" -extern int grpc_buffer_pool_trace; +extern int grpc_resource_quota_trace; -grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool); -void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_buffer_pool *buffer_pool); -grpc_buffer_pool *grpc_buffer_pool_from_channel_args( +grpc_resource_quota *grpc_resource_quota_internal_ref( + grpc_resource_quota *resource_quota); +void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_quota *resource_quota); +grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); typedef enum { @@ -54,15 +55,15 @@ typedef enum { GRPC_BULIST_COUNT } grpc_bulist; -typedef struct grpc_buffer_user grpc_buffer_user; +typedef struct grpc_resource_user grpc_resource_user; typedef struct { - grpc_buffer_user *next; - grpc_buffer_user *prev; -} grpc_buffer_user_link; + grpc_resource_user *next; + grpc_resource_user *prev; +} grpc_resource_user_link; -struct grpc_buffer_user { - grpc_buffer_pool *buffer_pool; +struct grpc_resource_user { + grpc_resource_quota *resource_quota; grpc_closure allocate_closure; grpc_closure add_to_free_pool_closure; @@ -84,45 +85,47 @@ struct grpc_buffer_user { grpc_closure destroy_closure; gpr_atm on_done_destroy_closure; - grpc_buffer_user_link links[GRPC_BULIST_COUNT]; + grpc_resource_user_link links[GRPC_BULIST_COUNT]; char *name; }; -void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool, const char *name); -void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - grpc_closure *on_done); -void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user); - -void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, size_t size, - grpc_closure *optional_on_done); -void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, size_t size); -void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - bool destructive, grpc_closure *closure); -void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user); - -typedef struct grpc_buffer_user_slice_allocator { +void grpc_resource_user_init(grpc_resource_user *resource_user, + grpc_resource_quota *resource_quota, + const char *name); +void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + grpc_closure *on_done); +void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user); + +void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, size_t size, + grpc_closure *optional_on_done); +void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, size_t size); +void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + bool destructive, grpc_closure *closure); +void grpc_resource_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user); + +typedef struct grpc_resource_user_slice_allocator { grpc_closure on_allocated; grpc_closure on_done; size_t length; size_t count; gpr_slice_buffer *dest; - grpc_buffer_user *buffer_user; -} grpc_buffer_user_slice_allocator; + grpc_resource_user *resource_user; +} grpc_resource_user_slice_allocator; -void grpc_buffer_user_slice_allocator_init( - grpc_buffer_user_slice_allocator *slice_allocator, - grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p); +void grpc_resource_user_slice_allocator_init( + grpc_resource_user_slice_allocator *slice_allocator, + grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p); -void grpc_buffer_user_alloc_slices( - grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, - size_t length, size_t count, gpr_slice_buffer *dest); +void grpc_resource_user_alloc_slices( + grpc_exec_ctx *exec_ctx, + grpc_resource_user_slice_allocator *slice_allocator, size_t length, + size_t count, gpr_slice_buffer *dest); #endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index dadd4cc2eb..e74a696c2f 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL); + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == @@ -135,16 +135,16 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( &channel_args->args[i], options); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); - buffer_pool = grpc_buffer_pool_internal_ref( + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_internal_ref( channel_args->args[i].value.pointer.p); } } } grpc_endpoint *ep = - grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); return ep; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 648ca52818..27b6677545 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -102,8 +102,8 @@ typedef struct { char *peer_string; - grpc_buffer_user buffer_user; - grpc_buffer_user_slice_allocator slice_allocator; + grpc_resource_user resource_user; + grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -113,17 +113,17 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_maybe_shutdown_buffer_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { +static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) { - grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user, - grpc_closure_create(tcp_unref_closure, tcp)); + grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, + grpc_closure_create(tcp_unref_closure, tcp)); } } static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_fd_shutdown(exec_ctx, tcp->em_fd); } @@ -131,7 +131,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, "tcp_unref_orphan"); gpr_slice_buffer_destroy(&tcp->last_read_buffer); - grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user); + grpc_resource_user_destroy(exec_ctx, &tcp->resource_user); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -170,13 +170,13 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - TCP_UNREF(exec_ctx, arg, "buffer_user"); + TCP_UNREF(exec_ctx, arg, "resource_user"); } static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + tcp_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -286,7 +286,7 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - grpc_buffer_user_alloc_slices( + grpc_resource_user_alloc_slices( exec_ctx, &tcp->slice_allocator, tcp->slice_size, (size_t)tcp->iov_size - tcp->incoming_buffer->count, tcp->incoming_buffer); @@ -513,9 +513,9 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { return grpc_fd_get_workqueue(tcp->em_fd); } -static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) { +static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->buffer_user; + return &tcp->resource_user; } static const grpc_endpoint_vtable vtable = {tcp_read, @@ -525,10 +525,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, - tcp_get_buffer_user, + tcp_get_resource_user, tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, + grpc_resource_quota *resource_quota, size_t slice_size, const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; @@ -543,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, tcp->iov_size = 1; tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy, and with the shutdown for our - * buffer_user */ + * resource_user */ gpr_ref_init(&tcp->refcount, 2); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; @@ -552,9 +553,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); - grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string); - grpc_buffer_user_slice_allocator_init( - &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp); + grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); + grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, + &tcp->resource_user, + tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -574,7 +576,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; - tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + tcp_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 768355cf0c..1c0d13f96e 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -53,7 +53,7 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool, +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota, size_t read_slice_size, const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 2afce529ef..b2eb89f429 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -138,7 +138,7 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; - grpc_buffer_pool *buffer_pool; + grpc_resource_quota *resource_quota; }; static gpr_once check_init = GPR_ONCE_INIT; @@ -163,25 +163,25 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->buffer_pool = grpc_buffer_pool_create(NULL); + s->resource_quota = grpc_resource_quota_create(NULL); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { - grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } } else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); - s->buffer_pool = - grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p); + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + s->resource_quota = + grpc_resource_quota_internal_ref(args->args[i].value.pointer.p); } else { - grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL " must be a pointer to a buffer pool"); @@ -222,7 +222,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp); } - grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); } @@ -440,7 +440,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->buffer_pool, + grpc_tcp_create(fdobj, sp->server->resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), read_notifier_pollset, &acceptor); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index 4e703aa9f4..cb5ba554b0 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -124,14 +124,14 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); - grpc_buffer_pool *buffer_pool = - grpc_buffer_pool_create("google_default_credentials"); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("google_default_credentials"); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollent, buffer_pool, &request, + &exec_ctx, &context, &detector.pollent, resource_quota, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), grpc_closure_create(on_compute_engine_detection_http_response, &detector), &detector.response); - grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_exec_ctx_flush(&exec_ctx); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index ffcd0b3910..0339fd5d61 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -657,16 +657,16 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, *(req.host + (req.http.path - jwks_uri)) = '\0'; } - /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host + /* TODO(ctiller): Carry the resource_quota in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); + grpc_resource_quota *resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( - exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, + exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), grpc_closure_create(on_keys_retrieved, ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); grpc_json_destroy(json); gpr_free(req.host); return; @@ -769,15 +769,15 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, rsp_idx = HTTP_RESPONSE_OPENID; } - /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host + /* TODO(ctiller): Carry the resource_quota in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); + grpc_resource_quota *resource_quota = grpc_resource_quota_create("jwt_verifier"); grpc_httpcli_get( - exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, + exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), http_cb, &ctx->responses[rsp_idx]); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); gpr_free(req.host); gpr_free(req.http.path); return; diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index 61c0815b2a..102831637b 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -307,14 +307,14 @@ static void compute_engine_fetch_oauth2( request.http.path = GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH; request.http.hdr_count = 1; request.http.hdrs = &header; - /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host + /* TODO(ctiller): Carry the resource_quota in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("oauth2_credentials"); - grpc_httpcli_get(exec_ctx, httpcli_context, pollent, buffer_pool, &request, + grpc_resource_quota *resource_quota = grpc_resource_quota_create("oauth2_credentials"); + grpc_httpcli_get(exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline, grpc_closure_create(response_cb, metadata_req), &metadata_req->response); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); } grpc_call_credentials *grpc_google_compute_engine_credentials_create( @@ -362,16 +362,16 @@ static void refresh_token_fetch_oauth2( request.http.hdr_count = 1; request.http.hdrs = &header; request.handshaker = &grpc_httpcli_ssl; - /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host + /* TODO(ctiller): Carry the resource_quota in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = - grpc_buffer_pool_create("oauth2_credentials_refresh"); - grpc_httpcli_post(exec_ctx, httpcli_context, pollent, buffer_pool, &request, + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("oauth2_credentials_refresh"); + grpc_httpcli_post(exec_ctx, httpcli_context, pollent, resource_quota, &request, body, strlen(body), deadline, grpc_closure_create(response_cb, metadata_req), &metadata_req->response); - grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); gpr_free(body); } diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index ee6b9f97e8..9f84237171 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -370,9 +370,9 @@ static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { return grpc_endpoint_get_workqueue(ep->wrapped_ep); } -static grpc_buffer_user *endpoint_get_buffer_user(grpc_endpoint *secure_ep) { +static grpc_resource_user *endpoint_get_resource_user(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - return grpc_endpoint_get_buffer_user(ep->wrapped_ep); + return grpc_endpoint_get_resource_user(ep->wrapped_ep); } static const grpc_endpoint_vtable vtable = {endpoint_read, @@ -382,7 +382,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_add_to_pollset_set, endpoint_shutdown, endpoint_destroy, - endpoint_get_buffer_user, + endpoint_get_resource_user, endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 39cab0cc5f..7903f57a68 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -49,10 +49,10 @@ #include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/buffer_pool.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -192,7 +192,7 @@ void grpc_init(void) { // Default timeout trace to 1 grpc_cq_event_timeout_trace = 1; grpc_register_tracer("op_failure", &grpc_trace_operation_failures); - grpc_register_tracer("buffer_pool", &grpc_buffer_pool_trace); + grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); #ifndef NDEBUG grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif |