From 91031dacb1ff5c57500307044b18c9f929134462 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 28 Dec 2016 15:44:25 -0800 Subject: Changes to exec_ctx/closure/combiner/workqueue interfaces - make closures know where they should be executed (eg, on a workqueue, or a combiner, or on an exec_ctx) - this allows removal of a large number of trampoline functions that were appearing whenever we used combiners, and should allow for a much easier interface to combiner locks --- src/core/lib/iomgr/resource_quota.c | 101 +++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 47 deletions(-) (limited to 'src/core/lib/iomgr/resource_quota.c') diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 213d29600c..8db539edfb 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -265,9 +265,8 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, if (resource_quota->step_scheduled) return; resource_quota->step_scheduled = true; grpc_resource_quota_internal_ref(resource_quota); - grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner, - &resource_quota->rq_step_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure, + GRPC_ERROR_NONE); } /* returns true if all allocations are completed */ @@ -294,7 +293,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; - grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); + grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); @@ -439,7 +438,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, resource_user->new_reclaimers[destructive] = NULL; GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED); return false; } resource_user->reclaimers[destructive] = closure; @@ -480,10 +479,10 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); @@ -496,10 +495,10 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], - GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; rq_step_sched(exec_ctx, resource_user->resource_quota); @@ -571,9 +570,12 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota); + grpc_closure_init( + &resource_quota->rq_step_closure, rq_step, resource_quota, + grpc_combiner_finally_scheduler(resource_quota->combiner, true)); grpc_closure_init(&resource_quota->rq_reclamation_done_closure, - rq_reclamation_done, resource_quota); + rq_reclamation_done, resource_quota, + grpc_combiner_scheduler(resource_quota->combiner, false)); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_quota->roots[i] = NULL; } @@ -614,9 +616,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_internal_ref(resource_quota); a->size = (int64_t)size; - grpc_closure_init(&a->closure, rq_resize, a); - grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure, - GRPC_ERROR_NONE, false); + grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); + grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -663,15 +664,19 @@ grpc_resource_user *grpc_resource_user_create( resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, - resource_user); + resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->add_to_free_pool_closure, - &ru_add_to_free_pool, resource_user); + &ru_add_to_free_pool, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->post_reclaimer_closure[0], - &ru_post_benign_reclaimer, resource_user); + &ru_post_benign_reclaimer, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); grpc_closure_init(&resource_user->post_reclaimer_closure[1], - &ru_post_destructive_reclaimer, resource_user); - grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, - resource_user); + &ru_post_destructive_reclaimer, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); + grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user, + grpc_combiner_scheduler(resource_quota->combiner, false)); gpr_mu_init(&resource_user->mu); gpr_atm_rel_store(&resource_user->refs, 1); gpr_atm_rel_store(&resource_user->shutdown, 0); @@ -706,9 +711,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx, gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); GPR_ASSERT(old >= amount); if (old == amount) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->destroy_closure, GRPC_ERROR_NONE, - false); + grpc_closure_sched(exec_ctx, &resource_user->destroy_closure, + GRPC_ERROR_NONE); } } @@ -724,9 +728,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - grpc_closure_create(ru_shutdown, resource_user), - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, + grpc_closure_create( + ru_shutdown, resource_user, + grpc_combiner_scheduler( + resource_user->resource_quota->combiner, false)), + GRPC_ERROR_NONE); } } @@ -746,12 +753,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); if (!resource_user->allocating) { resource_user->allocating = true; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->allocate_closure, GRPC_ERROR_NONE, - false); + grpc_closure_sched(exec_ctx, &resource_user->allocate_closure, + GRPC_ERROR_NONE); } } else { - grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); } @@ -770,9 +776,8 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, if (is_bigger_than_zero && was_zero_or_negative && !resource_user->added_to_free_pool) { resource_user->added_to_free_pool = true; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->add_to_free_pool_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure, + GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); ru_unref_by(exec_ctx, resource_user, (gpr_atm)size); @@ -784,9 +789,9 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL); resource_user->new_reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, + &resource_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE); } void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, @@ -795,18 +800,20 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } - grpc_combiner_execute( - exec_ctx, resource_user->resource_quota->combiner, - &resource_user->resource_quota->rq_reclamation_done_closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure, + GRPC_ERROR_NONE); } void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { - grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices, - slice_allocator); - grpc_closure_init(&slice_allocator->on_done, cb, p); + grpc_closure_init( + &slice_allocator->on_allocated, ru_allocated_slices, slice_allocator, + grpc_combiner_scheduler(resource_user->resource_quota->combiner, false)); + grpc_closure_init( + &slice_allocator->on_done, cb, p, + grpc_combiner_scheduler(resource_user->resource_quota->combiner, false)); slice_allocator->resource_user = resource_user; } -- cgit v1.2.3