aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/alarm.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/alarm.c')
-rw-r--r--src/core/iomgr/alarm.c65
1 files changed, 22 insertions, 43 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 7b67fe3b1d..0ba5361606 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -44,7 +44,6 @@
#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
-#define MAX_ALARMS_PER_CHECK 128
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
@@ -73,7 +72,7 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
+static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, int success);
static gpr_timespec compute_min_deadline(shard_type *shard) {
@@ -103,10 +102,9 @@ void grpc_alarm_list_init(gpr_timespec now) {
}
}
-void grpc_alarm_list_shutdown(void) {
+void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0))
- ;
+ run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
@@ -172,15 +170,14 @@ static void note_deadline_change(shard_type *shard) {
}
}
-void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
- grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
- gpr_timespec now) {
+void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm,
+ gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb,
+ void *alarm_cb_arg, gpr_timespec now) {
int is_first_alarm = 0;
shard_type *shard = &g_shards[shard_idx(alarm)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
- alarm->cb = alarm_cb;
- alarm->cb_arg = alarm_cb_arg;
+ grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg);
alarm->deadline = deadline;
alarm->triggered = 0;
@@ -223,12 +220,11 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
}
}
-void grpc_alarm_cancel(grpc_alarm *alarm) {
+void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) {
shard_type *shard = &g_shards[shard_idx(alarm)];
- int triggered = 0;
gpr_mu_lock(&shard->mu);
if (!alarm->triggered) {
- triggered = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0);
alarm->triggered = 1;
if (alarm->heap_index == INVALID_HEAP_INDEX) {
list_remove(alarm);
@@ -237,10 +233,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
}
}
gpr_mu_unlock(&shard->mu);
-
- if (triggered) {
- alarm->cb(alarm->cb_arg, 0);
- }
}
/* This is called when the queue is empty and "now" has reached the
@@ -291,40 +283,38 @@ static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_alarms(shard_type *shard, gpr_timespec now,
- grpc_alarm **alarms, size_t max_alarms,
- gpr_timespec *new_min_deadline) {
+static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard,
+ gpr_timespec now, gpr_timespec *new_min_deadline,
+ int success) {
size_t n = 0;
grpc_alarm *alarm;
gpr_mu_lock(&shard->mu);
- while (n < max_alarms && (alarm = pop_one(shard, now))) {
- alarms[n++] = alarm;
+ while ((alarm = pop_one(shard, now))) {
+ grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success);
+ n++;
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
return n;
}
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
+static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, int success) {
size_t n = 0;
- size_t i;
- grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
/* TODO(ctiller): verify that there are any alarms (atomically) here */
if (gpr_mu_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
- while (n < MAX_ALARMS_PER_CHECK &&
- gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
+ while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
gpr_timespec new_min_deadline;
/* For efficiency, we pop as many available alarms as we can from the
shard. This may violate perfect alarm deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n += pop_alarms(g_shard_queue[0], now, alarms + n,
- MAX_ALARMS_PER_CHECK - n, &new_min_deadline);
+ n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
+ success);
/* An grpc_alarm_init() on the shard could intervene here, adding a new
alarm that is earlier than new_min_deadline. However,
@@ -343,25 +333,14 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
gpr_mu_unlock(&g_checker_mu);
}
- if (n && drop_mu) {
- gpr_mu_unlock(drop_mu);
- }
-
- for (i = 0; i < n; i++) {
- alarms[i]->cb(alarms[i]->cb_arg, success);
- }
-
- if (n && drop_mu) {
- gpr_mu_lock(drop_mu);
- }
-
return (int)n;
}
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
+int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
+ gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(
- drop_mu, now, next,
+ exec_ctx, now, next,
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
}