diff options
author | Craig Tiller <ctiller@google.com> | 2016-10-25 16:13:26 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-10-25 16:13:26 -0700 |
commit | 6b5d682c981f365d1f3c1bf771f113bd727d5ee0 (patch) | |
tree | 79d07c1ea175ced70dac7ec8ae11f837d9393f71 /src/core | |
parent | fe8c5012d31e29940c2771b6e36cc8042dfb2fd9 (diff) |
Review feedback
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 117 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 28 |
3 files changed, 87 insertions, 66 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index a2668474be..562f498d88 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2108,7 +2108,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( } /******************************************************************************* - * BUFFER POOLS + * RESOURCE QUOTAS */ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, @@ -2152,6 +2152,8 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport *t = arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", t->peer_string); @@ -2188,6 +2190,10 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_CHTTP2_ENHANCE_YOUR_CALM)); if (n > 1) { + /* Since we cancel one stream per destructive reclaimation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ post_destructive_reclaimer(exec_ctx, t); } } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 5466973408..4f0d16ca7e 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -49,6 +49,7 @@ struct grpc_resource_quota { gpr_refcount refs; /* Master combiner lock: all activity on a quota executes under this combiner + * (so no mutex is needed for this data structure) */ grpc_combiner *combiner; /* Size of the resource quota */ @@ -75,7 +76,7 @@ struct grpc_resource_quota { * list management */ -static void rulist_add_tail(grpc_resource_user *resource_user, +static void rulist_add_head(grpc_resource_user *resource_user, grpc_rulist list) { grpc_resource_quota *resource_quota = resource_user->resource_quota; grpc_resource_user **root = &resource_quota->roots[list]; @@ -91,7 +92,7 @@ static void rulist_add_tail(grpc_resource_user *resource_user, } } -static void rulist_add_head(grpc_resource_user *resource_user, +static void rulist_add_tail(grpc_resource_user *resource_user, grpc_rulist list) { grpc_resource_quota *resource_quota = resource_user->resource_quota; grpc_resource_user **root = &resource_quota->roots[list]; @@ -113,8 +114,8 @@ static bool rulist_empty(grpc_resource_quota *resource_quota, return resource_quota->roots[list] == NULL; } -static grpc_resource_user *rulist_pop(grpc_resource_quota *resource_quota, - grpc_rulist list) { +static grpc_resource_user *rulist_pop_tail(grpc_resource_quota *resource_quota, + grpc_rulist list) { grpc_resource_user **root = &resource_quota->roots[list]; grpc_resource_user *resource_user = *root; if (resource_user == NULL) { @@ -149,22 +150,22 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) { } /******************************************************************************* - * buffer pool state machine + * resource quota state machine */ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota); -static bool rq_scavenge(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota); +static bool rq_reclaim_from_per_user_free_pool( + grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota); static bool rq_reclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota, bool destructive); -static void rq_step(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { - grpc_resource_quota *resource_quota = bp; +static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) { + grpc_resource_quota *resource_quota = rq; resource_quota->step_scheduled = false; do { if (rq_alloc(exec_ctx, resource_quota)) goto done; - } while (rq_scavenge(exec_ctx, resource_quota)); + } while (rq_reclaim_from_per_user_free_pool(exec_ctx, resource_quota)); rq_reclaim(exec_ctx, resource_quota, false) || rq_reclaim(exec_ctx, resource_quota, true); done: @@ -185,8 +186,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; - while ((resource_user = - rulist_pop(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { + while ((resource_user = rulist_pop_tail(resource_quota, + GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool < 0 && -resource_user->free_pool <= resource_quota->free_pool) { @@ -194,13 +195,13 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, resource_user->free_pool = 0; resource_quota->free_pool -= amt; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 + gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64 " bytes; rq_free_pool -> %" PRId64, resource_quota->name, resource_user->name, amt, resource_quota->free_pool); } } else if (grpc_resource_quota_trace && resource_user->free_pool >= 0) { - gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", + gpr_log(GPR_DEBUG, "RQ %s %s: discard already satisfied alloc request", resource_quota->name, resource_user->name); } if (resource_user->free_pool >= 0) { @@ -208,7 +209,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); gpr_mu_unlock(&resource_user->mu); } else { - rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); + rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); gpr_mu_unlock(&resource_user->mu); return false; } @@ -217,18 +218,18 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } /* returns true if any memory could be reclaimed from buffers */ -static bool rq_scavenge(grpc_exec_ctx *exec_ctx, - grpc_resource_quota *resource_quota) { +static bool rq_reclaim_from_per_user_free_pool( + grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; - while ((resource_user = - rulist_pop(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { + while ((resource_user = rulist_pop_tail(resource_quota, + GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool > 0) { int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool += amt; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 + gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64 " bytes; rq_free_pool -> %" PRId64, resource_quota->name, resource_user->name, amt, resource_quota->free_pool); @@ -248,10 +249,10 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, if (resource_quota->reclaiming) return true; grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE : GRPC_RULIST_RECLAIMER_BENIGN; - grpc_resource_user *resource_user = rulist_pop(resource_quota, list); + grpc_resource_user *resource_user = rulist_pop_tail(resource_quota, list); if (resource_user == NULL) return false; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclamation", + gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation", resource_quota->name, resource_user->name, destructive ? "destructive" : "benign"); } @@ -314,33 +315,34 @@ static gpr_slice ru_slice_create(grpc_resource_user *resource_user, } /******************************************************************************* - * grpc_resource_quota internal implementation + * grpc_resource_quota internal implementation: resource user manipulation under + * the combiner */ -static void ru_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { - grpc_resource_user *resource_user = bu; +static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + grpc_resource_user *resource_user = ru; if (rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); + rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); } -static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = bu; + grpc_resource_user *resource_user = ru; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); + rulist_add_head(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } -static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = bu; + grpc_resource_user *resource_user = ru; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -349,12 +351,12 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, GRPC_RULIST_RECLAIMER_BENIGN)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); + rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); } -static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, +static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = bu; + grpc_resource_user *resource_user = ru; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -365,11 +367,11 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); + rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } -static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { - grpc_resource_user *resource_user = bu; +static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + grpc_resource_user *resource_user = ru; GPR_ASSERT(resource_user->allocated == 0); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); @@ -387,9 +389,9 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { } } -static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, +static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_resource_user_slice_allocator *slice_allocator = ts; + grpc_resource_user_slice_allocator *slice_allocator = arg; if (error == GRPC_ERROR_NONE) { for (size_t i = 0; i < slice_allocator->count; i++) { gpr_slice_buffer_add_indexed( @@ -400,6 +402,11 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); } +/******************************************************************************* + * grpc_resource_quota internal implementation: quota manipulation under the + * combiner + */ + typedef struct { int64_t size; grpc_resource_quota *resource_quota; @@ -411,20 +418,14 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; - if (delta < 0 && a->resource_quota->free_pool < 0) { - rq_step_sched(exec_ctx, a->resource_quota); - } else if (delta > 0 && - !rulist_empty(a->resource_quota, - GRPC_RULIST_AWAITING_ALLOCATION)) { - rq_step_sched(exec_ctx, a->resource_quota); - } + rq_step_sched(exec_ctx, a->resource_quota); grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota); gpr_free(a); } -static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp, +static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) { - grpc_resource_quota *resource_quota = bp; + grpc_resource_quota *resource_quota = rq; resource_quota->reclaiming = false; rq_step_sched(exec_ctx, resource_quota); grpc_resource_quota_internal_unref(exec_ctx, resource_quota); @@ -434,6 +435,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *bp, * grpc_resource_quota api */ +/* Public API */ grpc_resource_quota *grpc_resource_quota_create(const char *name) { grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); gpr_ref_init(&resource_quota->refs, 1); @@ -466,6 +468,7 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, } } +/* Public API */ void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); @@ -478,10 +481,12 @@ grpc_resource_quota *grpc_resource_quota_internal_ref( return resource_quota; } +/* Public API */ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { grpc_resource_quota_internal_ref(resource_quota); } +/* Public API */ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -513,12 +518,12 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args( return grpc_resource_quota_create(NULL); } -static void *rq_copy(void *bp) { - grpc_resource_quota_ref(bp); - return bp; +static void *rq_copy(void *rq) { + grpc_resource_quota_ref(rq); + return rq; } -static void rq_destroy(void *bp) { grpc_resource_quota_unref(bp); } +static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); } static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); } @@ -604,7 +609,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, if (on_done_destroy != NULL) { /* already shutdown */ if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", + gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown", resource_user->resource_quota->name, resource_user->name, size); } grpc_exec_ctx_sched( @@ -616,7 +621,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, resource_user->allocated += (int64_t)size; resource_user->free_pool -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 + gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 ", free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->allocated, resource_user->free_pool); @@ -644,7 +649,7 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, resource_user->free_pool += (int64_t)size; resource_user->allocated -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 + gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64 ", free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, resource_user->allocated, resource_user->free_pool); @@ -685,7 +690,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "BP %s %s: reclamation complete", + gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } grpc_combiner_execute( diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index af94a19911..c15cb68310 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -106,7 +106,7 @@ struct grpc_resource_user { /* The quota this resource user consumes from */ grpc_resource_quota *resource_quota; - /* Closure to schedule an allocation onder the resource quota combiner lock */ + /* Closure to schedule an allocation under the resource quota combiner lock */ grpc_closure allocate_closure; /* Closure to publish a non empty free pool under the resource quota combiner lock */ @@ -118,12 +118,13 @@ struct grpc_resource_user { #endif gpr_mu mu; - /* Total allocated memory outstanding by this resource user; + /* Total allocated memory outstanding by this resource user in bytes; always positive */ int64_t allocated; - /* The amount of memory this user has cached for its own use: to avoid quota - contention, each resource user can keep some memory aside from the quota, - and the quota can pull it back under memory pressure. + /* The amount of memory (in bytes) this user has cached for its own use: to + avoid quota contention, each resource user can keep some memory in + addition to what it is immediately using (e.g., for caching), and the quota + can pull it back under memory pressure. This value can become negative if more memory has been requested than existed in the free pool, at which point the quota is consulted to bring this value non-negative (asynchronously). */ @@ -148,7 +149,8 @@ struct grpc_resource_user { resource user */ grpc_closure destroy_closure; /* User supplied closure to call once the user has finished shutting down AND - all outstanding allocations have been freed */ + all outstanding allocations have been freed. Real type is grpc_closure*, + but it's stored as an atomic to avoid a mutex on some fast paths. */ gpr_atm on_done_destroy_closure; /* Links in the various grpc_rulist lists */ @@ -167,7 +169,7 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user); -/* Allocate from the resource user (and it's quota). +/* Allocate from the resource user (and its quota). If optional_on_done is NULL, then allocate immediately. This may push the quota over-limit, at which point reclamation will kick in. If optional_on_done is non-NULL, it will be scheduled when the allocation has @@ -191,20 +193,28 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, /* Helper to allocate slices from a resource user */ typedef struct grpc_resource_user_slice_allocator { + /* Closure for when a resource user allocation completes */ grpc_closure on_allocated; + /* Closure to call when slices have been allocated */ grpc_closure on_done; + /* Length of slices to allocate on the current request */ size_t length; + /* Number of slices to allocate on the current request */ size_t count; + /* Destination for slices to allocate on the current request */ gpr_slice_buffer *dest; + /* Parent resource user */ grpc_resource_user *resource_user; } grpc_resource_user_slice_allocator; -/* Initialize a slice allocator */ +/* Initialize a slice allocator. + When an allocation is completed, calls \a cb with arg \p. */ void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p); -/* Allocate \a count slices of length \a length into \a dest. */ +/* Allocate \a count slices of length \a length into \a dest. Only one request + can be outstanding at a time. */ void grpc_resource_user_alloc_slices( grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator, size_t length, |