aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2014-12-19 16:21:57 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2014-12-29 17:05:35 -0800
commit3bf466fb6c8cbbd4334d70be9c251feb71a7c78a (patch)
treeed26d5d417f67245175a9ad4883aa9c22691a04d /src/core/iomgr
parent1a809c0ebbf77aedf7f6322ef7d6373962c80264 (diff)
Port [] alarm management to GRPC.
This change implements a platform independent alarm manager in alarm.c. It's integrated with iomgr, and some tests are cleaned up. The alarm implementation itself is a fairly direct port of LazyAlarmList from eventmanager. SpinLock has been replaced for now with gpr_mu, and other atomic operations have been dropped (again, for now). A majority of tests have been ported. Change on 2014/12/19 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82551363
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/alarm.c353
-rw-r--r--src/core/iomgr/alarm.h32
-rw-r--r--src/core/iomgr/alarm_heap.c148
-rw-r--r--src/core/iomgr/alarm_heap.h57
-rw-r--r--src/core/iomgr/alarm_internal.h50
-rw-r--r--src/core/iomgr/iomgr_libevent.c115
-rw-r--r--src/core/iomgr/iomgr_libevent.h8
7 files changed, 660 insertions, 103 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
new file mode 100644
index 0000000000..b7238f716a
--- /dev/null
+++ b/src/core/iomgr/alarm.c
@@ -0,0 +1,353 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/alarm.h"
+
+#include "src/core/iomgr/alarm_heap.h"
+#include "src/core/iomgr/alarm_internal.h"
+#include "src/core/iomgr/time_averaged_stats.h"
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+#define INVALID_HEAP_INDEX 0xffffffffu
+
+#define LOG2_NUM_SHARDS 5
+#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
+#define MAX_ALARMS_PER_CHECK 128
+#define ADD_DEADLINE_SCALE 0.33
+#define MIN_QUEUE_WINDOW_DURATION 0.01
+#define MAX_QUEUE_WINDOW_DURATION 1
+
+typedef struct {
+ gpr_mu mu;
+ grpc_time_averaged_stats stats;
+ /* All and only alarms with deadlines <= this will be in the heap. */
+ gpr_timespec queue_deadline_cap;
+ gpr_timespec min_deadline;
+ /* Index in the g_shard_queue */
+ gpr_uint32 shard_queue_index;
+ /* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this
+ list have the top bit of their deadline set to 0. */
+ grpc_alarm_heap heap;
+ /* This holds alarms whose deadline is >= queue_deadline_cap. */
+ grpc_alarm list;
+} shard_type;
+
+/* Protects g_shard_queue */
+static gpr_mu g_mu;
+/* Allow only one run_some_expired_alarms at once */
+static gpr_mu g_checker_mu;
+static shard_type g_shards[NUM_SHARDS];
+/* Protected by g_mu */
+static shard_type *g_shard_queue[NUM_SHARDS];
+
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status);
+
+static gpr_timespec compute_min_deadline(shard_type *shard) {
+ return grpc_alarm_heap_is_empty(&shard->heap)
+ ? shard->queue_deadline_cap
+ : grpc_alarm_heap_top(&shard->heap)->deadline;
+}
+
+void grpc_alarm_list_init(gpr_timespec now) {
+ int i;
+
+ gpr_mu_init(&g_mu);
+ gpr_mu_init(&g_checker_mu);
+
+ for (i = 0; i < NUM_SHARDS; i++) {
+ shard_type *shard = &g_shards[i];
+ gpr_mu_init(&shard->mu);
+ grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
+ 0.5);
+ shard->queue_deadline_cap = now;
+ shard->shard_queue_index = i;
+ grpc_alarm_heap_init(&shard->heap);
+ shard->list.next = shard->list.prev = &shard->list;
+ shard->min_deadline = compute_min_deadline(shard);
+ g_shard_queue[i] = shard;
+ }
+}
+
+void grpc_alarm_list_shutdown() {
+ int i;
+ while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED))
+ ;
+ for (i = 0; i < NUM_SHARDS; i++) {
+ shard_type *shard = &g_shards[i];
+ gpr_mu_destroy(&shard->mu);
+ grpc_alarm_heap_destroy(&shard->heap);
+ }
+ gpr_mu_destroy(&g_mu);
+ gpr_mu_destroy(&g_checker_mu);
+}
+
+/* This is a cheap, but good enough, pointer hash for sharding the tasks: */
+static size_t shard_idx(const grpc_alarm *info) {
+ size_t x = (size_t)info;
+ return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
+}
+
+static double ts_to_dbl(gpr_timespec ts) {
+ return ts.tv_sec + 1e-9 * ts.tv_nsec;
+}
+
+static gpr_timespec dbl_to_ts(double d) {
+ gpr_timespec ts;
+ ts.tv_sec = d;
+ ts.tv_nsec = 1e9 * (d - ts.tv_sec);
+ return ts;
+}
+
+static void list_join(grpc_alarm *head, grpc_alarm *alarm) {
+ alarm->next = head;
+ alarm->prev = head->prev;
+ alarm->next->prev = alarm->prev->next = alarm;
+}
+
+static void list_remove(grpc_alarm *alarm) {
+ alarm->next->prev = alarm->prev;
+ alarm->prev->next = alarm->next;
+}
+
+static void swap_adjacent_shards_in_queue(size_t first_shard_queue_index) {
+ shard_type *temp;
+ temp = g_shard_queue[first_shard_queue_index];
+ g_shard_queue[first_shard_queue_index] =
+ g_shard_queue[first_shard_queue_index + 1];
+ g_shard_queue[first_shard_queue_index + 1] = temp;
+ g_shard_queue[first_shard_queue_index]->shard_queue_index =
+ first_shard_queue_index;
+ g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
+ first_shard_queue_index + 1;
+}
+
+static void note_deadline_change(shard_type *shard) {
+ while (shard->shard_queue_index > 0 &&
+ gpr_time_cmp(
+ shard->min_deadline,
+ g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
+ swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
+ }
+ while (shard->shard_queue_index < NUM_SHARDS - 1 &&
+ gpr_time_cmp(
+ shard->min_deadline,
+ g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
+ swap_adjacent_shards_in_queue(shard->shard_queue_index);
+ }
+}
+
+void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
+ grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
+ gpr_timespec now) {
+ int is_first_alarm = 0;
+ shard_type *shard = &g_shards[shard_idx(alarm)];
+ alarm->cb = alarm_cb;
+ alarm->cb_arg = alarm_cb_arg;
+ alarm->deadline = deadline;
+ alarm->triggered = 0;
+
+ /* TODO(ctiller): check deadline expired */
+
+ gpr_mu_lock(&shard->mu);
+ grpc_time_averaged_stats_add_sample(&shard->stats,
+ ts_to_dbl(gpr_time_sub(deadline, now)));
+ if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
+ is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm);
+ } else {
+ alarm->heap_index = INVALID_HEAP_INDEX;
+ list_join(&shard->list, alarm);
+ }
+ gpr_mu_unlock(&shard->mu);
+
+ /* Deadline may have decreased, we need to adjust the master queue. Note
+ that there is a potential racy unlocked region here. There could be a
+ reordering of multiple grpc_alarm_init calls, at this point, but the < test
+ below should ensure that we err on the side of caution. There could
+ also be a race with grpc_alarm_check, which might beat us to the lock. In
+ that case, it is possible that the alarm that we added will have already
+ run by the time we hold the lock, but that too is a safe error.
+ Finally, it's possible that the grpc_alarm_check that intervened failed to
+ trigger the new alarm because the min_deadline hadn't yet been reduced.
+ In that case, the alarm will simply have to wait for the next
+ grpc_alarm_check. */
+ if (is_first_alarm) {
+ gpr_mu_lock(&g_mu);
+ if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
+ gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
+ shard->min_deadline = deadline;
+ note_deadline_change(shard);
+ if (shard->shard_queue_index == 0 &&
+ gpr_time_cmp(deadline, old_min_deadline) < 0) {
+ grpc_kick_poller();
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+}
+
+void grpc_alarm_cancel(grpc_alarm *alarm) {
+ shard_type *shard = &g_shards[shard_idx(alarm)];
+ int triggered = 0;
+ gpr_mu_lock(&shard->mu);
+ if (!alarm->triggered) {
+ triggered = 1;
+ alarm->triggered = 1;
+ if (alarm->heap_index == INVALID_HEAP_INDEX) {
+ list_remove(alarm);
+ } else {
+ grpc_alarm_heap_remove(&shard->heap, alarm);
+ }
+ }
+ gpr_mu_unlock(&shard->mu);
+
+ if (triggered) {
+ alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED);
+ }
+}
+
+/* This is called when the queue is empty and "now" has reached the
+ queue_deadline_cap. We compute a new queue deadline and then scan the map
+ for alarms that fall at or under it. Returns true if the queue is no
+ longer empty.
+ REQUIRES: shard->mu locked */
+static int refill_queue(shard_type *shard, gpr_timespec now) {
+ /* Compute the new queue window width and bound by the limits: */
+ double computed_deadline_delta =
+ grpc_time_averaged_stats_update_average(&shard->stats) *
+ ADD_DEADLINE_SCALE;
+ double deadline_delta =
+ GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
+ MAX_QUEUE_WINDOW_DURATION);
+ grpc_alarm *alarm, *next;
+
+ /* Compute the new cap and put all alarms under it into the queue: */
+ shard->queue_deadline_cap = gpr_time_add(
+ gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
+ for (alarm = shard->list.next; alarm != &shard->list; alarm = next) {
+ next = alarm->next;
+
+ if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) {
+ list_remove(alarm);
+ grpc_alarm_heap_add(&shard->heap, alarm);
+ }
+ }
+ return !grpc_alarm_heap_is_empty(&shard->heap);
+}
+
+/* This pops the next non-cancelled alarm with deadline <= now from the queue,
+ or returns NULL if there isn't one.
+ REQUIRES: shard->mu locked */
+static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
+ grpc_alarm *alarm;
+ for (;;) {
+ if (grpc_alarm_heap_is_empty(&shard->heap)) {
+ if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
+ if (!refill_queue(shard, now)) return NULL;
+ }
+ alarm = grpc_alarm_heap_top(&shard->heap);
+ if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL;
+ alarm->triggered = 1;
+ grpc_alarm_heap_pop(&shard->heap);
+ return alarm;
+ }
+}
+
+/* REQUIRES: shard->mu unlocked */
+static size_t pop_alarms(shard_type *shard, gpr_timespec now,
+ grpc_alarm **alarms, size_t max_alarms,
+ gpr_timespec *new_min_deadline) {
+ size_t n = 0;
+ grpc_alarm *alarm;
+ gpr_mu_lock(&shard->mu);
+ while (n < max_alarms && (alarm = pop_one(shard, now))) {
+ alarms[n++] = alarm;
+ }
+ *new_min_deadline = compute_min_deadline(shard);
+ gpr_mu_unlock(&shard->mu);
+ return n;
+}
+
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status) {
+ size_t n = 0;
+ size_t i;
+ grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
+
+ /* TODO(ctiller): verify that there are any alarms (atomically) here */
+
+ if (gpr_mu_trylock(&g_checker_mu)) {
+ gpr_mu_lock(&g_mu);
+
+ while (n < MAX_ALARMS_PER_CHECK &&
+ gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
+ gpr_timespec new_min_deadline;
+
+ /* For efficiency, we pop as many available alarms as we can from the
+ shard. This may violate perfect alarm deadline ordering, but that
+ shouldn't be a big deal because we don't make ordering guarantees. */
+ n += pop_alarms(g_shard_queue[0], now, alarms + n,
+ MAX_ALARMS_PER_CHECK - n, &new_min_deadline);
+
+ /* An grpc_alarm_init() on the shard could intervene here, adding a new
+ alarm that is earlier than new_min_deadline. However,
+ grpc_alarm_init() will block on the master_lock before it can call
+ set_min_deadline, so this one will complete first and then the AddAlarm
+ will reduce the min_deadline (perhaps unnecessarily). */
+ g_shard_queue[0]->min_deadline = new_min_deadline;
+ note_deadline_change(g_shard_queue[0]);
+ }
+
+ gpr_mu_unlock(&g_mu);
+ gpr_mu_unlock(&g_checker_mu);
+ }
+
+ for (i = 0; i < n; i++) {
+ alarms[i]->cb(alarms[i]->cb_arg, status);
+ }
+
+ return n;
+}
+
+int grpc_alarm_check(gpr_timespec now) {
+ return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS);
+}
+
+gpr_timespec grpc_alarm_list_next_timeout() {
+ gpr_timespec out;
+ gpr_mu_lock(&g_mu);
+ out = g_shard_queue[0]->min_deadline;
+ gpr_mu_unlock(&g_mu);
+ return out;
+}
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h
index 2bb5bf7022..f94dcec6e9 100644
--- a/src/core/iomgr/alarm.h
+++ b/src/core/iomgr/alarm.h
@@ -38,23 +38,25 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
-typedef struct grpc_alarm grpc_alarm;
-
-/* One of the following headers should provide struct grpc_alarm */
-#ifdef GPR_LIBEVENT
-#include "src/core/iomgr/iomgr_libevent.h"
-#endif
+typedef struct grpc_alarm {
+ gpr_timespec deadline;
+ gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */
+ struct grpc_alarm *next;
+ struct grpc_alarm *prev;
+ int triggered;
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+} grpc_alarm;
/* Initialize *alarm. When expired or canceled, alarm_cb will be called with
*alarm_cb_arg and status to indicate if it expired (SUCCESS) or was
canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once,
and application code should check the status to determine how it was
invoked. The application callback is also responsible for maintaining
- information about when to free up any user-level state.
- Returns 1 on success, 0 on failure. */
-int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
- grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
- gpr_timespec now);
+ information about when to free up any user-level state. */
+void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
+ grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
+ gpr_timespec now);
/* Note that there is no alarm destroy function. This is because the
alarm is a one-time occurrence with a guarantee that the callback will
@@ -75,7 +77,13 @@ int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
exactly once from either the cancellation (with status CANCELLED)
or from the activation (with status SUCCESS)
+ Note carefully that the callback function MAY occur in the same callstack
+ as grpc_alarm_cancel. It's expected that most alarms will be cancelled (their
+ primary use is to implement deadlines), and so this code is optimized such
+ that cancellation costs as little as possible. Making callbacks run inline
+ matches this aim.
+
Requires: cancel() must happen after add() on a given alarm */
-int grpc_alarm_cancel(grpc_alarm *alarm);
+void grpc_alarm_cancel(grpc_alarm *alarm);
#endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */
diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/alarm_heap.c
new file mode 100644
index 0000000000..693d26d03f
--- /dev/null
+++ b/src/core/iomgr/alarm_heap.c
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/alarm_heap.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+/* Adjusts a heap so as to move a hole at position i closer to the root,
+ until a suitable position is found for element t. Then, copies t into that
+ position. This functor is called each time immediately after modifying a
+ value in the underlying container, with the offset of the modified element as
+ its argument. */
+static void adjust_upwards(grpc_alarm **first, int i, grpc_alarm *t) {
+ while (i > 0) {
+ int parent = (i - 1) / 2;
+ if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break;
+ first[i] = first[parent];
+ first[i]->heap_index = i;
+ i = parent;
+ }
+ first[i] = t;
+ t->heap_index = i;
+}
+
+/* Adjusts a heap so as to move a hole at position i farther away from the root,
+ until a suitable position is found for element t. Then, copies t into that
+ position. */
+static void adjust_downwards(grpc_alarm **first, int i, int length,
+ grpc_alarm *t) {
+ for (;;) {
+ int left_child = 1 + 2 * i;
+ int right_child;
+ int next_i;
+ if (left_child >= length) break;
+ right_child = left_child + 1;
+ next_i = right_child < length &&
+ gpr_time_cmp(first[left_child]->deadline,
+ first[right_child]->deadline) < 0
+ ? right_child
+ : left_child;
+ if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break;
+ first[i] = first[next_i];
+ first[i]->heap_index = i;
+ i = next_i;
+ }
+ first[i] = t;
+ t->heap_index = i;
+}
+
+#define SHRINK_MIN_ELEMS 8
+#define SHRINK_FULLNESS_FACTOR 2
+
+static void maybe_shrink(grpc_alarm_heap *heap) {
+ if (heap->alarm_count >= 8 &&
+ heap->alarm_count <= heap->alarm_capacity / SHRINK_FULLNESS_FACTOR / 2) {
+ heap->alarm_capacity = heap->alarm_count * SHRINK_FULLNESS_FACTOR;
+ heap->alarms =
+ gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *));
+ }
+}
+
+static void note_changed_priority(grpc_alarm_heap *heap, grpc_alarm *alarm) {
+ int i = alarm->heap_index;
+ int parent = (i - 1) / 2;
+ if (gpr_time_cmp(heap->alarms[parent]->deadline, alarm->deadline) < 0) {
+ adjust_upwards(heap->alarms, i, alarm);
+ } else {
+ adjust_downwards(heap->alarms, i, heap->alarm_count, alarm);
+ }
+}
+
+void grpc_alarm_heap_init(grpc_alarm_heap *heap) {
+ memset(heap, 0, sizeof(*heap));
+}
+
+void grpc_alarm_heap_destroy(grpc_alarm_heap *heap) { gpr_free(heap->alarms); }
+
+int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm) {
+ if (heap->alarm_count == heap->alarm_capacity) {
+ heap->alarm_capacity =
+ GPR_MAX(heap->alarm_capacity + 1, heap->alarm_capacity * 3 / 2);
+ heap->alarms =
+ gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *));
+ }
+ alarm->heap_index = heap->alarm_count;
+ adjust_upwards(heap->alarms, heap->alarm_count, alarm);
+ heap->alarm_count++;
+ return alarm->heap_index == 0;
+}
+
+void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm) {
+ int i = alarm->heap_index;
+ if (i == heap->alarm_count - 1) {
+ heap->alarm_count--;
+ maybe_shrink(heap);
+ return;
+ }
+ heap->alarms[i] = heap->alarms[heap->alarm_count - 1];
+ heap->alarms[i]->heap_index = i;
+ heap->alarm_count--;
+ maybe_shrink(heap);
+ note_changed_priority(heap, heap->alarms[i]);
+}
+
+int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap) {
+ return heap->alarm_count == 0;
+}
+
+grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap) {
+ return heap->alarms[0];
+}
+
+void grpc_alarm_heap_pop(grpc_alarm_heap *heap) {
+ grpc_alarm_heap_remove(heap, grpc_alarm_heap_top(heap));
+}
diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/alarm_heap.h
new file mode 100644
index 0000000000..e51f96dd44
--- /dev/null
+++ b/src/core/iomgr/alarm_heap.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_
+#define __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_
+
+#include "src/core/iomgr/alarm.h"
+
+typedef struct {
+ grpc_alarm **alarms;
+ int alarm_count;
+ int alarm_capacity;
+} grpc_alarm_heap;
+
+/* return 1 if the new alarm is the first alarm in the heap */
+int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm);
+
+void grpc_alarm_heap_init(grpc_alarm_heap *heap);
+void grpc_alarm_heap_destroy(grpc_alarm_heap *heap);
+
+void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm);
+grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap);
+void grpc_alarm_heap_pop(grpc_alarm_heap *heap);
+
+int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap);
+
+#endif /* __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_ */
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
new file mode 100644
index 0000000000..e605ff84f9
--- /dev/null
+++ b/src/core/iomgr/alarm_internal.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
+#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
+
+/* iomgr internal api for dealing with alarms */
+
+int grpc_alarm_check(gpr_timespec now);
+
+void grpc_alarm_list_init(gpr_timespec now);
+void grpc_alarm_list_shutdown();
+
+gpr_timespec grpc_alarm_list_next_timeout();
+
+/* the following must be implemented by each iomgr implementation */
+
+void grpc_kick_poller();
+
+#endif /* __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ */
diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c
index 3c94d35a38..a00df72661 100644
--- a/src/core/iomgr/iomgr_libevent.c
+++ b/src/core/iomgr/iomgr_libevent.c
@@ -37,6 +37,7 @@
#include <fcntl.h>
#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/alarm_internal.h"
#include <grpc/support/atm.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -131,6 +132,10 @@ static void maybe_free_fds() {
}
}
+/* TODO(ctiller): this is racy. In non-libevent implementations, use a pipe
+ or eventfd */
+void grpc_kick_poller() { event_base_loopbreak(g_event_base); }
+
/* Spend some time doing polling and libevent maintenance work if no other
thread is. This includes both polling for events and destroying/closing file
descriptor objects.
@@ -162,8 +167,20 @@ static int maybe_do_polling_work(struct timeval delay) {
return 1;
}
+static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) {
+ int r = 0;
+ if (gpr_time_cmp(next, now) < 0) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
+ r = grpc_alarm_check(now);
+ gpr_mu_lock(&grpc_iomgr_mu);
+ }
+ return r;
+}
+
int grpc_iomgr_work(gpr_timespec deadline) {
- gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ gpr_timespec now = gpr_now();
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
/* poll for no longer than one second */
gpr_timespec max_delay = {1, 0};
struct timeval delay;
@@ -178,7 +195,8 @@ int grpc_iomgr_work(gpr_timespec deadline) {
delay = gpr_timeval_from_timespec(delay_timespec);
- if (maybe_do_queue_work() || maybe_do_polling_work(delay)) {
+ if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) ||
+ maybe_do_polling_work(delay)) {
g_last_poll_completed = gpr_now();
return 1;
}
@@ -189,7 +207,7 @@ int grpc_iomgr_work(gpr_timespec deadline) {
static void backup_poller_thread(void *p) {
int backup_poller_engaged = 0;
/* allow no pollers for 100 milliseconds, then engage backup polling */
- gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000);
+ gpr_timespec allow_no_pollers = gpr_time_from_millis(100);
gpr_mu_lock(&grpc_iomgr_mu);
while (!g_shutdown_backup_poller) {
@@ -203,8 +221,13 @@ static void backup_poller_thread(void *p) {
backup_poller_engaged = 1;
}
if (!maybe_do_queue_work()) {
- struct timeval tv = {1, 0};
- maybe_do_polling_work(tv);
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ if (!maybe_do_alarm_work(now, next)) {
+ gpr_timespec deadline =
+ gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1)));
+ maybe_do_polling_work(
+ gpr_timeval_from_timespec(gpr_time_sub(deadline, now)));
+ }
}
} else {
if (backup_poller_engaged) {
@@ -236,6 +259,8 @@ void grpc_iomgr_init() {
abort();
}
+ grpc_alarm_list_init(gpr_now());
+
gpr_mu_init(&grpc_iomgr_mu);
gpr_cv_init(&grpc_iomgr_cv);
g_activation_queue = NULL;
@@ -295,6 +320,8 @@ void grpc_iomgr_shutdown() {
gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
+ grpc_alarm_list_shutdown();
+
/* drain pending work */
gpr_mu_lock(&grpc_iomgr_mu);
while (maybe_do_queue_work())
@@ -331,84 +358,6 @@ static void add_task(grpc_libevent_activation_data *adata) {
gpr_mu_unlock(&grpc_iomgr_mu);
}
-/* ===============grpc_alarm implementation==================== */
-
-/* The following function frees up the alarm's libevent structure and
- should always be invoked just before calling the alarm's callback */
-static void alarm_ev_destroy(grpc_alarm *alarm) {
- grpc_libevent_activation_data *adata =
- &alarm->task.activation[GRPC_EM_TA_ONLY];
- if (adata->ev != NULL) {
- /* TODO(klempner): Is this safe to do when we're cancelling? */
- event_free(adata->ev);
- adata->ev = NULL;
- }
-}
-/* Proxy callback triggered by alarm->ev to call alarm->cb */
-static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
- grpc_alarm *alarm = arg;
- grpc_libevent_activation_data *adata =
- &alarm->task.activation[GRPC_EM_TA_ONLY];
- int trigger_old;
-
- /* First check if this alarm has been canceled, atomically */
- trigger_old =
- gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
- if (trigger_old == ALARM_TRIGGER_INIT) {
- /* Before invoking user callback, destroy the libevent structure */
- alarm_ev_destroy(alarm);
- adata->status = GRPC_CALLBACK_SUCCESS;
- add_task(adata);
- }
-}
-
-int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
- grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
- gpr_timespec now) {
- grpc_libevent_activation_data *adata =
- &alarm->task.activation[GRPC_EM_TA_ONLY];
- gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
- struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
- alarm->task.type = GRPC_EM_TASK_ALARM;
- gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
- adata->cb = alarm_cb;
- adata->arg = alarm_cb_arg;
- adata->prev = NULL;
- adata->next = NULL;
- adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm);
- /* Set the trigger field to untriggered. Do this as the last store since
- it is a release of previous stores. */
- gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
-
- return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0;
-}
-
-int grpc_alarm_cancel(grpc_alarm *alarm) {
- grpc_libevent_activation_data *adata =
- &alarm->task.activation[GRPC_EM_TA_ONLY];
- int trigger_old;
-
- /* First check if this alarm has been triggered, atomically */
- trigger_old =
- gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
- if (trigger_old == ALARM_TRIGGER_INIT) {
- /* We need to make sure that we only invoke the callback if it hasn't
- already been invoked */
- /* First remove this event from libevent. This returns success even if the
- event has gone active or invoked its callback. */
- if (evtimer_del(adata->ev) != 0) {
- /* The delete was unsuccessful for some reason. */
- gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful");
- return 0;
- }
- /* Free up the event structure before invoking callback */
- alarm_ev_destroy(alarm);
- adata->status = GRPC_CALLBACK_CANCELLED;
- add_task(adata);
- }
- return 1;
-}
-
static void grpc_fd_impl_destroy(grpc_fd *impl) {
grpc_em_task_activity_type type;
grpc_libevent_activation_data *adata;
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
index e5564da4c4..5c088006a0 100644
--- a/src/core/iomgr/iomgr_libevent.h
+++ b/src/core/iomgr/iomgr_libevent.h
@@ -201,14 +201,6 @@ struct grpc_fd {
void *on_done_user_data;
};
-/* gRPC alarm handle.
- The handle is used to add an alarm which expires after specified timeout. */
-struct grpc_alarm {
- grpc_libevent_task task; /* Include the base class */
-
- gpr_atm triggered; /* To be used atomically if alarm triggered */
-};
-
void grpc_iomgr_ref_address_resolution(int delta);
#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */