diff options
Diffstat (limited to 'src/core/iomgr/alarm.c')
-rw-r--r-- | src/core/iomgr/alarm.c | 65 |
1 files changed, 22 insertions, 43 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 7b67fe3b1d..0ba5361606 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -44,7 +44,6 @@ #define LOG2_NUM_SHARDS 5 #define NUM_SHARDS (1 << LOG2_NUM_SHARDS) -#define MAX_ALARMS_PER_CHECK 128 #define ADD_DEADLINE_SCALE 0.33 #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 @@ -73,7 +72,7 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, +static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success); static gpr_timespec compute_min_deadline(shard_type *shard) { @@ -103,10 +102,9 @@ void grpc_alarm_list_init(gpr_timespec now) { } } -void grpc_alarm_list_shutdown(void) { +void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0)) - ; + run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -172,15 +170,14 @@ static void note_deadline_change(shard_type *shard) { } } -void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, - grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now) { +void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm, + gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, + void *alarm_cb_arg, gpr_timespec now) { int is_first_alarm = 0; shard_type *shard = &g_shards[shard_idx(alarm)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); - alarm->cb = alarm_cb; - alarm->cb_arg = alarm_cb_arg; + grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg); alarm->deadline = deadline; alarm->triggered = 0; @@ -223,12 +220,11 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, } } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) { shard_type *shard = &g_shards[shard_idx(alarm)]; - int triggered = 0; gpr_mu_lock(&shard->mu); if (!alarm->triggered) { - triggered = 1; + grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0); alarm->triggered = 1; if (alarm->heap_index == INVALID_HEAP_INDEX) { list_remove(alarm); @@ -237,10 +233,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } } gpr_mu_unlock(&shard->mu); - - if (triggered) { - alarm->cb(alarm->cb_arg, 0); - } } /* This is called when the queue is empty and "now" has reached the @@ -291,40 +283,38 @@ static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_alarms(shard_type *shard, gpr_timespec now, - grpc_alarm **alarms, size_t max_alarms, - gpr_timespec *new_min_deadline) { +static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard, + gpr_timespec now, gpr_timespec *new_min_deadline, + int success) { size_t n = 0; grpc_alarm *alarm; gpr_mu_lock(&shard->mu); - while (n < max_alarms && (alarm = pop_one(shard, now))) { - alarms[n++] = alarm; + while ((alarm = pop_one(shard, now))) { + grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success); + n++; } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); return n; } -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, +static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success) { size_t n = 0; - size_t i; - grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; /* TODO(ctiller): verify that there are any alarms (atomically) here */ if (gpr_mu_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); - while (n < MAX_ALARMS_PER_CHECK && - gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { + while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { gpr_timespec new_min_deadline; /* For efficiency, we pop as many available alarms as we can from the shard. This may violate perfect alarm deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_alarms(g_shard_queue[0], now, alarms + n, - MAX_ALARMS_PER_CHECK - n, &new_min_deadline); + n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline, + success); /* An grpc_alarm_init() on the shard could intervene here, adding a new alarm that is earlier than new_min_deadline. However, @@ -343,25 +333,14 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, gpr_mu_unlock(&g_checker_mu); } - if (n && drop_mu) { - gpr_mu_unlock(drop_mu); - } - - for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, success); - } - - if (n && drop_mu) { - gpr_mu_lock(drop_mu); - } - return (int)n; } -int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { +int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms( - drop_mu, now, next, + exec_ctx, now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } |