diff options
author | ctiller <ctiller@google.com> | 2014-12-19 16:21:57 -0800 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2014-12-29 17:05:35 -0800 |
commit | 3bf466fb6c8cbbd4334d70be9c251feb71a7c78a (patch) | |
tree | ed26d5d417f67245175a9ad4883aa9c22691a04d /src/core | |
parent | 1a809c0ebbf77aedf7f6322ef7d6373962c80264 (diff) |
Port [] alarm management to GRPC.
This change implements a platform independent alarm manager in alarm.c.
It's integrated with iomgr, and some tests are cleaned up.
The alarm implementation itself is a fairly direct port of LazyAlarmList from eventmanager.
SpinLock has been replaced for now with gpr_mu, and other atomic operations have been dropped (again, for now).
A majority of tests have been ported.
Change on 2014/12/19 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82551363
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_setup.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/alarm.c | 353 | ||||
-rw-r--r-- | src/core/iomgr/alarm.h | 32 | ||||
-rw-r--r-- | src/core/iomgr/alarm_heap.c | 148 | ||||
-rw-r--r-- | src/core/iomgr/alarm_heap.h | 57 | ||||
-rw-r--r-- | src/core/iomgr/alarm_internal.h | 50 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_libevent.c | 115 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_libevent.h | 8 | ||||
-rw-r--r-- | src/core/support/time.c | 8 |
9 files changed, 673 insertions, 104 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index fa946b0f1b..b1194e278d 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -108,6 +108,7 @@ static void setup_initiate(grpc_transport_setup *sp) { not to initiate again) */ static void setup_cancel(grpc_transport_setup *sp) { grpc_client_setup *s = (grpc_client_setup *)sp; + int cancel_alarm = 0; gpr_mu_lock(&s->mu); @@ -115,7 +116,7 @@ static void setup_cancel(grpc_transport_setup *sp) { /* effectively cancels the current request (if any) */ s->active_request = NULL; if (s->in_alarm) { - grpc_alarm_cancel(&s->backoff_alarm); + cancel_alarm = 1; } if (--s->refs == 0) { gpr_mu_unlock(&s->mu); @@ -123,6 +124,9 @@ static void setup_cancel(grpc_transport_setup *sp) { } else { gpr_mu_unlock(&s->mu); } + if (cancel_alarm) { + grpc_alarm_cancel(&s->backoff_alarm); + } } /* vtable for transport setup */ diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c new file mode 100644 index 0000000000..b7238f716a --- /dev/null +++ b/src/core/iomgr/alarm.c @@ -0,0 +1,353 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/alarm.h" + +#include "src/core/iomgr/alarm_heap.h" +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/time_averaged_stats.h" +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#define INVALID_HEAP_INDEX 0xffffffffu + +#define LOG2_NUM_SHARDS 5 +#define NUM_SHARDS (1 << LOG2_NUM_SHARDS) +#define MAX_ALARMS_PER_CHECK 128 +#define ADD_DEADLINE_SCALE 0.33 +#define MIN_QUEUE_WINDOW_DURATION 0.01 +#define MAX_QUEUE_WINDOW_DURATION 1 + +typedef struct { + gpr_mu mu; + grpc_time_averaged_stats stats; + /* All and only alarms with deadlines <= this will be in the heap. */ + gpr_timespec queue_deadline_cap; + gpr_timespec min_deadline; + /* Index in the g_shard_queue */ + gpr_uint32 shard_queue_index; + /* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this + list have the top bit of their deadline set to 0. */ + grpc_alarm_heap heap; + /* This holds alarms whose deadline is >= queue_deadline_cap. */ + grpc_alarm list; +} shard_type; + +/* Protects g_shard_queue */ +static gpr_mu g_mu; +/* Allow only one run_some_expired_alarms at once */ +static gpr_mu g_checker_mu; +static shard_type g_shards[NUM_SHARDS]; +/* Protected by g_mu */ +static shard_type *g_shard_queue[NUM_SHARDS]; + +static int run_some_expired_alarms(gpr_timespec now, + grpc_iomgr_cb_status status); + +static gpr_timespec compute_min_deadline(shard_type *shard) { + return grpc_alarm_heap_is_empty(&shard->heap) + ? shard->queue_deadline_cap + : grpc_alarm_heap_top(&shard->heap)->deadline; +} + +void grpc_alarm_list_init(gpr_timespec now) { + int i; + + gpr_mu_init(&g_mu); + gpr_mu_init(&g_checker_mu); + + for (i = 0; i < NUM_SHARDS; i++) { + shard_type *shard = &g_shards[i]; + gpr_mu_init(&shard->mu); + grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, + 0.5); + shard->queue_deadline_cap = now; + shard->shard_queue_index = i; + grpc_alarm_heap_init(&shard->heap); + shard->list.next = shard->list.prev = &shard->list; + shard->min_deadline = compute_min_deadline(shard); + g_shard_queue[i] = shard; + } +} + +void grpc_alarm_list_shutdown() { + int i; + while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED)) + ; + for (i = 0; i < NUM_SHARDS; i++) { + shard_type *shard = &g_shards[i]; + gpr_mu_destroy(&shard->mu); + grpc_alarm_heap_destroy(&shard->heap); + } + gpr_mu_destroy(&g_mu); + gpr_mu_destroy(&g_checker_mu); +} + +/* This is a cheap, but good enough, pointer hash for sharding the tasks: */ +static size_t shard_idx(const grpc_alarm *info) { + size_t x = (size_t)info; + return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1); +} + +static double ts_to_dbl(gpr_timespec ts) { + return ts.tv_sec + 1e-9 * ts.tv_nsec; +} + +static gpr_timespec dbl_to_ts(double d) { + gpr_timespec ts; + ts.tv_sec = d; + ts.tv_nsec = 1e9 * (d - ts.tv_sec); + return ts; +} + +static void list_join(grpc_alarm *head, grpc_alarm *alarm) { + alarm->next = head; + alarm->prev = head->prev; + alarm->next->prev = alarm->prev->next = alarm; +} + +static void list_remove(grpc_alarm *alarm) { + alarm->next->prev = alarm->prev; + alarm->prev->next = alarm->next; +} + +static void swap_adjacent_shards_in_queue(size_t first_shard_queue_index) { + shard_type *temp; + temp = g_shard_queue[first_shard_queue_index]; + g_shard_queue[first_shard_queue_index] = + g_shard_queue[first_shard_queue_index + 1]; + g_shard_queue[first_shard_queue_index + 1] = temp; + g_shard_queue[first_shard_queue_index]->shard_queue_index = + first_shard_queue_index; + g_shard_queue[first_shard_queue_index + 1]->shard_queue_index = + first_shard_queue_index + 1; +} + +static void note_deadline_change(shard_type *shard) { + while (shard->shard_queue_index > 0 && + gpr_time_cmp( + shard->min_deadline, + g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) { + swap_adjacent_shards_in_queue(shard->shard_queue_index - 1); + } + while (shard->shard_queue_index < NUM_SHARDS - 1 && + gpr_time_cmp( + shard->min_deadline, + g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) { + swap_adjacent_shards_in_queue(shard->shard_queue_index); + } +} + +void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, + grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, + gpr_timespec now) { + int is_first_alarm = 0; + shard_type *shard = &g_shards[shard_idx(alarm)]; + alarm->cb = alarm_cb; + alarm->cb_arg = alarm_cb_arg; + alarm->deadline = deadline; + alarm->triggered = 0; + + /* TODO(ctiller): check deadline expired */ + + gpr_mu_lock(&shard->mu); + grpc_time_averaged_stats_add_sample(&shard->stats, + ts_to_dbl(gpr_time_sub(deadline, now))); + if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { + is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm); + } else { + alarm->heap_index = INVALID_HEAP_INDEX; + list_join(&shard->list, alarm); + } + gpr_mu_unlock(&shard->mu); + + /* Deadline may have decreased, we need to adjust the master queue. Note + that there is a potential racy unlocked region here. There could be a + reordering of multiple grpc_alarm_init calls, at this point, but the < test + below should ensure that we err on the side of caution. There could + also be a race with grpc_alarm_check, which might beat us to the lock. In + that case, it is possible that the alarm that we added will have already + run by the time we hold the lock, but that too is a safe error. + Finally, it's possible that the grpc_alarm_check that intervened failed to + trigger the new alarm because the min_deadline hadn't yet been reduced. + In that case, the alarm will simply have to wait for the next + grpc_alarm_check. */ + if (is_first_alarm) { + gpr_mu_lock(&g_mu); + if (gpr_time_cmp(deadline, shard->min_deadline) < 0) { + gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline; + shard->min_deadline = deadline; + note_deadline_change(shard); + if (shard->shard_queue_index == 0 && + gpr_time_cmp(deadline, old_min_deadline) < 0) { + grpc_kick_poller(); + } + } + gpr_mu_unlock(&g_mu); + } +} + +void grpc_alarm_cancel(grpc_alarm *alarm) { + shard_type *shard = &g_shards[shard_idx(alarm)]; + int triggered = 0; + gpr_mu_lock(&shard->mu); + if (!alarm->triggered) { + triggered = 1; + alarm->triggered = 1; + if (alarm->heap_index == INVALID_HEAP_INDEX) { + list_remove(alarm); + } else { + grpc_alarm_heap_remove(&shard->heap, alarm); + } + } + gpr_mu_unlock(&shard->mu); + + if (triggered) { + alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED); + } +} + +/* This is called when the queue is empty and "now" has reached the + queue_deadline_cap. We compute a new queue deadline and then scan the map + for alarms that fall at or under it. Returns true if the queue is no + longer empty. + REQUIRES: shard->mu locked */ +static int refill_queue(shard_type *shard, gpr_timespec now) { + /* Compute the new queue window width and bound by the limits: */ + double computed_deadline_delta = + grpc_time_averaged_stats_update_average(&shard->stats) * + ADD_DEADLINE_SCALE; + double deadline_delta = + GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION, + MAX_QUEUE_WINDOW_DURATION); + grpc_alarm *alarm, *next; + + /* Compute the new cap and put all alarms under it into the queue: */ + shard->queue_deadline_cap = gpr_time_add( + gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta)); + for (alarm = shard->list.next; alarm != &shard->list; alarm = next) { + next = alarm->next; + + if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) { + list_remove(alarm); + grpc_alarm_heap_add(&shard->heap, alarm); + } + } + return !grpc_alarm_heap_is_empty(&shard->heap); +} + +/* This pops the next non-cancelled alarm with deadline <= now from the queue, + or returns NULL if there isn't one. + REQUIRES: shard->mu locked */ +static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { + grpc_alarm *alarm; + for (;;) { + if (grpc_alarm_heap_is_empty(&shard->heap)) { + if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL; + if (!refill_queue(shard, now)) return NULL; + } + alarm = grpc_alarm_heap_top(&shard->heap); + if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL; + alarm->triggered = 1; + grpc_alarm_heap_pop(&shard->heap); + return alarm; + } +} + +/* REQUIRES: shard->mu unlocked */ +static size_t pop_alarms(shard_type *shard, gpr_timespec now, + grpc_alarm **alarms, size_t max_alarms, + gpr_timespec *new_min_deadline) { + size_t n = 0; + grpc_alarm *alarm; + gpr_mu_lock(&shard->mu); + while (n < max_alarms && (alarm = pop_one(shard, now))) { + alarms[n++] = alarm; + } + *new_min_deadline = compute_min_deadline(shard); + gpr_mu_unlock(&shard->mu); + return n; +} + +static int run_some_expired_alarms(gpr_timespec now, + grpc_iomgr_cb_status status) { + size_t n = 0; + size_t i; + grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; + + /* TODO(ctiller): verify that there are any alarms (atomically) here */ + + if (gpr_mu_trylock(&g_checker_mu)) { + gpr_mu_lock(&g_mu); + + while (n < MAX_ALARMS_PER_CHECK && + gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { + gpr_timespec new_min_deadline; + + /* For efficiency, we pop as many available alarms as we can from the + shard. This may violate perfect alarm deadline ordering, but that + shouldn't be a big deal because we don't make ordering guarantees. */ + n += pop_alarms(g_shard_queue[0], now, alarms + n, + MAX_ALARMS_PER_CHECK - n, &new_min_deadline); + + /* An grpc_alarm_init() on the shard could intervene here, adding a new + alarm that is earlier than new_min_deadline. However, + grpc_alarm_init() will block on the master_lock before it can call + set_min_deadline, so this one will complete first and then the AddAlarm + will reduce the min_deadline (perhaps unnecessarily). */ + g_shard_queue[0]->min_deadline = new_min_deadline; + note_deadline_change(g_shard_queue[0]); + } + + gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_checker_mu); + } + + for (i = 0; i < n; i++) { + alarms[i]->cb(alarms[i]->cb_arg, status); + } + + return n; +} + +int grpc_alarm_check(gpr_timespec now) { + return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS); +} + +gpr_timespec grpc_alarm_list_next_timeout() { + gpr_timespec out; + gpr_mu_lock(&g_mu); + out = g_shard_queue[0]->min_deadline; + gpr_mu_unlock(&g_mu); + return out; +} diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h index 2bb5bf7022..f94dcec6e9 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/alarm.h @@ -38,23 +38,25 @@ #include <grpc/support/port_platform.h> #include <grpc/support/time.h> -typedef struct grpc_alarm grpc_alarm; - -/* One of the following headers should provide struct grpc_alarm */ -#ifdef GPR_LIBEVENT -#include "src/core/iomgr/iomgr_libevent.h" -#endif +typedef struct grpc_alarm { + gpr_timespec deadline; + gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */ + struct grpc_alarm *next; + struct grpc_alarm *prev; + int triggered; + grpc_iomgr_cb_func cb; + void *cb_arg; +} grpc_alarm; /* Initialize *alarm. When expired or canceled, alarm_cb will be called with *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, and application code should check the status to determine how it was invoked. The application callback is also responsible for maintaining - information about when to free up any user-level state. - Returns 1 on success, 0 on failure. */ -int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, - grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now); + information about when to free up any user-level state. */ +void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, + grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, + gpr_timespec now); /* Note that there is no alarm destroy function. This is because the alarm is a one-time occurrence with a guarantee that the callback will @@ -75,7 +77,13 @@ int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, exactly once from either the cancellation (with status CANCELLED) or from the activation (with status SUCCESS) + Note carefully that the callback function MAY occur in the same callstack + as grpc_alarm_cancel. It's expected that most alarms will be cancelled (their + primary use is to implement deadlines), and so this code is optimized such + that cancellation costs as little as possible. Making callbacks run inline + matches this aim. + Requires: cancel() must happen after add() on a given alarm */ -int grpc_alarm_cancel(grpc_alarm *alarm); +void grpc_alarm_cancel(grpc_alarm *alarm); #endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */ diff --git a/src/core/iomgr/alarm_heap.c b/src/core/iomgr/alarm_heap.c new file mode 100644 index 0000000000..693d26d03f --- /dev/null +++ b/src/core/iomgr/alarm_heap.c @@ -0,0 +1,148 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/alarm_heap.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +/* Adjusts a heap so as to move a hole at position i closer to the root, + until a suitable position is found for element t. Then, copies t into that + position. This functor is called each time immediately after modifying a + value in the underlying container, with the offset of the modified element as + its argument. */ +static void adjust_upwards(grpc_alarm **first, int i, grpc_alarm *t) { + while (i > 0) { + int parent = (i - 1) / 2; + if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; + first[i] = first[parent]; + first[i]->heap_index = i; + i = parent; + } + first[i] = t; + t->heap_index = i; +} + +/* Adjusts a heap so as to move a hole at position i farther away from the root, + until a suitable position is found for element t. Then, copies t into that + position. */ +static void adjust_downwards(grpc_alarm **first, int i, int length, + grpc_alarm *t) { + for (;;) { + int left_child = 1 + 2 * i; + int right_child; + int next_i; + if (left_child >= length) break; + right_child = left_child + 1; + next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) < 0 + ? right_child + : left_child; + if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; + first[i] = first[next_i]; + first[i]->heap_index = i; + i = next_i; + } + first[i] = t; + t->heap_index = i; +} + +#define SHRINK_MIN_ELEMS 8 +#define SHRINK_FULLNESS_FACTOR 2 + +static void maybe_shrink(grpc_alarm_heap *heap) { + if (heap->alarm_count >= 8 && + heap->alarm_count <= heap->alarm_capacity / SHRINK_FULLNESS_FACTOR / 2) { + heap->alarm_capacity = heap->alarm_count * SHRINK_FULLNESS_FACTOR; + heap->alarms = + gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *)); + } +} + +static void note_changed_priority(grpc_alarm_heap *heap, grpc_alarm *alarm) { + int i = alarm->heap_index; + int parent = (i - 1) / 2; + if (gpr_time_cmp(heap->alarms[parent]->deadline, alarm->deadline) < 0) { + adjust_upwards(heap->alarms, i, alarm); + } else { + adjust_downwards(heap->alarms, i, heap->alarm_count, alarm); + } +} + +void grpc_alarm_heap_init(grpc_alarm_heap *heap) { + memset(heap, 0, sizeof(*heap)); +} + +void grpc_alarm_heap_destroy(grpc_alarm_heap *heap) { gpr_free(heap->alarms); } + +int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm) { + if (heap->alarm_count == heap->alarm_capacity) { + heap->alarm_capacity = + GPR_MAX(heap->alarm_capacity + 1, heap->alarm_capacity * 3 / 2); + heap->alarms = + gpr_realloc(heap->alarms, heap->alarm_capacity * sizeof(grpc_alarm *)); + } + alarm->heap_index = heap->alarm_count; + adjust_upwards(heap->alarms, heap->alarm_count, alarm); + heap->alarm_count++; + return alarm->heap_index == 0; +} + +void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm) { + int i = alarm->heap_index; + if (i == heap->alarm_count - 1) { + heap->alarm_count--; + maybe_shrink(heap); + return; + } + heap->alarms[i] = heap->alarms[heap->alarm_count - 1]; + heap->alarms[i]->heap_index = i; + heap->alarm_count--; + maybe_shrink(heap); + note_changed_priority(heap, heap->alarms[i]); +} + +int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap) { + return heap->alarm_count == 0; +} + +grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap) { + return heap->alarms[0]; +} + +void grpc_alarm_heap_pop(grpc_alarm_heap *heap) { + grpc_alarm_heap_remove(heap, grpc_alarm_heap_top(heap)); +} diff --git a/src/core/iomgr/alarm_heap.h b/src/core/iomgr/alarm_heap.h new file mode 100644 index 0000000000..e51f96dd44 --- /dev/null +++ b/src/core/iomgr/alarm_heap.h @@ -0,0 +1,57 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_ +#define __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_ + +#include "src/core/iomgr/alarm.h" + +typedef struct { + grpc_alarm **alarms; + int alarm_count; + int alarm_capacity; +} grpc_alarm_heap; + +/* return 1 if the new alarm is the first alarm in the heap */ +int grpc_alarm_heap_add(grpc_alarm_heap *heap, grpc_alarm *alarm); + +void grpc_alarm_heap_init(grpc_alarm_heap *heap); +void grpc_alarm_heap_destroy(grpc_alarm_heap *heap); + +void grpc_alarm_heap_remove(grpc_alarm_heap *heap, grpc_alarm *alarm); +grpc_alarm *grpc_alarm_heap_top(grpc_alarm_heap *heap); +void grpc_alarm_heap_pop(grpc_alarm_heap *heap); + +int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap); + +#endif /* __GRPC_INTERNAL_IOMGR_ALARM_HEAP_H_ */ diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h new file mode 100644 index 0000000000..e605ff84f9 --- /dev/null +++ b/src/core/iomgr/alarm_internal.h @@ -0,0 +1,50 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ +#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ + +/* iomgr internal api for dealing with alarms */ + +int grpc_alarm_check(gpr_timespec now); + +void grpc_alarm_list_init(gpr_timespec now); +void grpc_alarm_list_shutdown(); + +gpr_timespec grpc_alarm_list_next_timeout(); + +/* the following must be implemented by each iomgr implementation */ + +void grpc_kick_poller(); + +#endif /* __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c index 3c94d35a38..a00df72661 100644 --- a/src/core/iomgr/iomgr_libevent.c +++ b/src/core/iomgr/iomgr_libevent.c @@ -37,6 +37,7 @@ #include <fcntl.h> #include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/alarm_internal.h" #include <grpc/support/atm.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -131,6 +132,10 @@ static void maybe_free_fds() { } } +/* TODO(ctiller): this is racy. In non-libevent implementations, use a pipe + or eventfd */ +void grpc_kick_poller() { event_base_loopbreak(g_event_base); } + /* Spend some time doing polling and libevent maintenance work if no other thread is. This includes both polling for events and destroying/closing file descriptor objects. @@ -162,8 +167,20 @@ static int maybe_do_polling_work(struct timeval delay) { return 1; } +static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) { + int r = 0; + if (gpr_time_cmp(next, now) < 0) { + gpr_mu_unlock(&grpc_iomgr_mu); + r = grpc_alarm_check(now); + gpr_mu_lock(&grpc_iomgr_mu); + } + return r; +} + int grpc_iomgr_work(gpr_timespec deadline) { - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + gpr_timespec now = gpr_now(); + gpr_timespec next = grpc_alarm_list_next_timeout(); + gpr_timespec delay_timespec = gpr_time_sub(deadline, now); /* poll for no longer than one second */ gpr_timespec max_delay = {1, 0}; struct timeval delay; @@ -178,7 +195,8 @@ int grpc_iomgr_work(gpr_timespec deadline) { delay = gpr_timeval_from_timespec(delay_timespec); - if (maybe_do_queue_work() || maybe_do_polling_work(delay)) { + if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) || + maybe_do_polling_work(delay)) { g_last_poll_completed = gpr_now(); return 1; } @@ -189,7 +207,7 @@ int grpc_iomgr_work(gpr_timespec deadline) { static void backup_poller_thread(void *p) { int backup_poller_engaged = 0; /* allow no pollers for 100 milliseconds, then engage backup polling */ - gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000); + gpr_timespec allow_no_pollers = gpr_time_from_millis(100); gpr_mu_lock(&grpc_iomgr_mu); while (!g_shutdown_backup_poller) { @@ -203,8 +221,13 @@ static void backup_poller_thread(void *p) { backup_poller_engaged = 1; } if (!maybe_do_queue_work()) { - struct timeval tv = {1, 0}; - maybe_do_polling_work(tv); + gpr_timespec next = grpc_alarm_list_next_timeout(); + if (!maybe_do_alarm_work(now, next)) { + gpr_timespec deadline = + gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1))); + maybe_do_polling_work( + gpr_timeval_from_timespec(gpr_time_sub(deadline, now))); + } } } else { if (backup_poller_engaged) { @@ -236,6 +259,8 @@ void grpc_iomgr_init() { abort(); } + grpc_alarm_list_init(gpr_now()); + gpr_mu_init(&grpc_iomgr_mu); gpr_cv_init(&grpc_iomgr_cv); g_activation_queue = NULL; @@ -295,6 +320,8 @@ void grpc_iomgr_shutdown() { gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + grpc_alarm_list_shutdown(); + /* drain pending work */ gpr_mu_lock(&grpc_iomgr_mu); while (maybe_do_queue_work()) @@ -331,84 +358,6 @@ static void add_task(grpc_libevent_activation_data *adata) { gpr_mu_unlock(&grpc_iomgr_mu); } -/* ===============grpc_alarm implementation==================== */ - -/* The following function frees up the alarm's libevent structure and - should always be invoked just before calling the alarm's callback */ -static void alarm_ev_destroy(grpc_alarm *alarm) { - grpc_libevent_activation_data *adata = - &alarm->task.activation[GRPC_EM_TA_ONLY]; - if (adata->ev != NULL) { - /* TODO(klempner): Is this safe to do when we're cancelling? */ - event_free(adata->ev); - adata->ev = NULL; - } -} -/* Proxy callback triggered by alarm->ev to call alarm->cb */ -static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) { - grpc_alarm *alarm = arg; - grpc_libevent_activation_data *adata = - &alarm->task.activation[GRPC_EM_TA_ONLY]; - int trigger_old; - - /* First check if this alarm has been canceled, atomically */ - trigger_old = - gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); - if (trigger_old == ALARM_TRIGGER_INIT) { - /* Before invoking user callback, destroy the libevent structure */ - alarm_ev_destroy(alarm); - adata->status = GRPC_CALLBACK_SUCCESS; - add_task(adata); - } -} - -int grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, - grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now) { - grpc_libevent_activation_data *adata = - &alarm->task.activation[GRPC_EM_TA_ONLY]; - gpr_timespec delay_timespec = gpr_time_sub(deadline, now); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - alarm->task.type = GRPC_EM_TASK_ALARM; - gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); - adata->cb = alarm_cb; - adata->arg = alarm_cb_arg; - adata->prev = NULL; - adata->next = NULL; - adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm); - /* Set the trigger field to untriggered. Do this as the last store since - it is a release of previous stores. */ - gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); - - return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0; -} - -int grpc_alarm_cancel(grpc_alarm *alarm) { - grpc_libevent_activation_data *adata = - &alarm->task.activation[GRPC_EM_TA_ONLY]; - int trigger_old; - - /* First check if this alarm has been triggered, atomically */ - trigger_old = - gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); - if (trigger_old == ALARM_TRIGGER_INIT) { - /* We need to make sure that we only invoke the callback if it hasn't - already been invoked */ - /* First remove this event from libevent. This returns success even if the - event has gone active or invoked its callback. */ - if (evtimer_del(adata->ev) != 0) { - /* The delete was unsuccessful for some reason. */ - gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful"); - return 0; - } - /* Free up the event structure before invoking callback */ - alarm_ev_destroy(alarm); - adata->status = GRPC_CALLBACK_CANCELLED; - add_task(adata); - } - return 1; -} - static void grpc_fd_impl_destroy(grpc_fd *impl) { grpc_em_task_activity_type type; grpc_libevent_activation_data *adata; diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h index e5564da4c4..5c088006a0 100644 --- a/src/core/iomgr/iomgr_libevent.h +++ b/src/core/iomgr/iomgr_libevent.h @@ -201,14 +201,6 @@ struct grpc_fd { void *on_done_user_data; }; -/* gRPC alarm handle. - The handle is used to add an alarm which expires after specified timeout. */ -struct grpc_alarm { - grpc_libevent_task task; /* Include the base class */ - - gpr_atm triggered; /* To be used atomically if alarm triggered */ -}; - void grpc_iomgr_ref_address_resolution(int delta); #endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/support/time.c b/src/core/support/time.c index 1d8765f8cc..712bdf441c 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -47,6 +47,14 @@ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) { return cmp; } +gpr_timespec gpr_time_min(gpr_timespec a, gpr_timespec b) { + return gpr_time_cmp(a, b) < 0 ? a : b; +} + +gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) { + return gpr_time_cmp(a, b) > 0 ? a : b; +} + /* There's no standard TIME_T_MIN and TIME_T_MAX, so we construct them. The following assumes that signed types are two's-complement and that bytes are 8 bits. */ |