diff options
author | ctiller <ctiller@google.com> | 2014-12-19 16:21:57 -0800 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2014-12-29 17:05:35 -0800 |
commit | 3bf466fb6c8cbbd4334d70be9c251feb71a7c78a (patch) | |
tree | ed26d5d417f67245175a9ad4883aa9c22691a04d /test | |
parent | 1a809c0ebbf77aedf7f6322ef7d6373962c80264 (diff) |
Port [] alarm management to GRPC.
This change implements a platform independent alarm manager in alarm.c.
It's integrated with iomgr, and some tests are cleaned up.
The alarm implementation itself is a fairly direct port of LazyAlarmList from eventmanager.
SpinLock has been replaced for now with gpr_mu, and other atomic operations have been dropped (again, for now).
A majority of tests have been ported.
Change on 2014/12/19 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82551363
Diffstat (limited to 'test')
-rw-r--r-- | test/core/end2end/no_server_test.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/alarm_heap_test.c | 277 | ||||
-rw-r--r-- | test/core/iomgr/alarm_list_test.c | 144 | ||||
-rw-r--r-- | test/core/iomgr/alarm_test.c | 29 |
4 files changed, 438 insertions, 14 deletions
diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index f506b8bcf7..ba6349c109 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -41,7 +41,7 @@ static void *tag(gpr_intptr i) { return (void *)i; } int main(int argc, char **argv) { grpc_channel *chan; grpc_call *call; - gpr_timespec timeout = gpr_time_from_micros(4000000); + gpr_timespec timeout = gpr_time_from_seconds(4); gpr_timespec deadline = gpr_time_add(gpr_now(), timeout); grpc_completion_queue *cq; cq_verifier *cqv; diff --git a/test/core/iomgr/alarm_heap_test.c b/test/core/iomgr/alarm_heap_test.c new file mode 100644 index 0000000000..eaaaf156f4 --- /dev/null +++ b/test/core/iomgr/alarm_heap_test.c @@ -0,0 +1,277 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/alarm_heap.h" + +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "test/core/util/test_config.h" + +static gpr_timespec random_deadline() { + gpr_timespec ts; + ts.tv_sec = rand(); + ts.tv_nsec = rand(); + return ts; +} + +static grpc_alarm *create_test_elements(int num_elements) { + grpc_alarm *elems = gpr_malloc(num_elements * sizeof(grpc_alarm)); + int i; + for (i = 0; i < num_elements; i++) { + elems[i].deadline = random_deadline(); + } + return elems; +} + +static int cmp_elem(const void *a, const void *b) { + int i = *(const int *)a; + int j = *(const int *)b; + return i - j; +} + +static int *all_top(grpc_alarm_heap *pq, int *n) { + int *vec = NULL; + int *need_to_check_children; + int num_need_to_check_children = 0; + + *n = 0; + if (pq->alarm_count == 0) return vec; + need_to_check_children = gpr_malloc(pq->alarm_count * sizeof(int)); + need_to_check_children[num_need_to_check_children++] = 0; + vec = gpr_malloc(pq->alarm_count * sizeof(int)); + while (num_need_to_check_children > 0) { + int ind = need_to_check_children[0]; + int leftchild, rightchild; + num_need_to_check_children--; + memmove(need_to_check_children, need_to_check_children + 1, + num_need_to_check_children * sizeof(int)); + vec[(*n)++] = ind; + leftchild = 1 + 2 * ind; + if (leftchild < pq->alarm_count) { + if (gpr_time_cmp(pq->alarms[leftchild]->deadline, + pq->alarms[ind]->deadline) >= 0) { + need_to_check_children[num_need_to_check_children++] = leftchild; + } + rightchild = leftchild + 1; + if (rightchild < pq->alarm_count && + gpr_time_cmp(pq->alarms[rightchild]->deadline, + pq->alarms[ind]->deadline) >= 0) { + need_to_check_children[num_need_to_check_children++] = rightchild; + } + } + } + + gpr_free(need_to_check_children); + + return vec; +} + +static void check_pq_top(grpc_alarm *elements, grpc_alarm_heap *pq, + gpr_uint8 *inpq, int num_elements) { + gpr_timespec max_deadline = gpr_inf_past; + int *max_deadline_indices = gpr_malloc(num_elements * sizeof(int)); + int *top_elements; + int num_max_deadline_indices = 0; + int num_top_elements; + int i; + for (i = 0; i < num_elements; ++i) { + if (inpq[i] && gpr_time_cmp(elements[i].deadline, max_deadline) >= 0) { + if (gpr_time_cmp(elements[i].deadline, max_deadline) > 0) { + num_max_deadline_indices = 0; + max_deadline = elements[i].deadline; + } + max_deadline_indices[num_max_deadline_indices++] = elements[i].heap_index; + } + } + qsort(max_deadline_indices, num_max_deadline_indices, sizeof(int), cmp_elem); + top_elements = all_top(pq, &num_top_elements); + GPR_ASSERT(num_top_elements == num_max_deadline_indices); + for (i = 0; i < num_top_elements; i++) { + GPR_ASSERT(max_deadline_indices[i] == top_elements[i]); + } + gpr_free(max_deadline_indices); + gpr_free(top_elements); +} + +static int contains(grpc_alarm_heap *pq, grpc_alarm *el) { + int i; + for (i = 0; i < pq->alarm_count; i++) { + if (pq->alarms[i] == el) return 1; + } + return 0; +} + +static void check_valid(grpc_alarm_heap *pq) { + int i; + for (i = 0; i < pq->alarm_count; ++i) { + int left_child = 1 + 2 * i; + int right_child = left_child + 1; + if (left_child < pq->alarm_count) { + GPR_ASSERT(gpr_time_cmp(pq->alarms[i]->deadline, + pq->alarms[left_child]->deadline) >= 0); + } + if (right_child < pq->alarm_count) { + GPR_ASSERT(gpr_time_cmp(pq->alarms[i]->deadline, + pq->alarms[right_child]->deadline) >= 0); + } + } +} + +static void test1() { + grpc_alarm_heap pq; + const int num_test_elements = 200; + const int num_test_operations = 10000; + int i; + grpc_alarm *test_elements = create_test_elements(num_test_elements); + gpr_uint8 *inpq = gpr_malloc(num_test_elements); + + grpc_alarm_heap_init(&pq); + memset(inpq, 0, num_test_elements); + GPR_ASSERT(grpc_alarm_heap_is_empty(&pq)); + check_valid(&pq); + for (i = 0; i < num_test_elements; ++i) { + GPR_ASSERT(!contains(&pq, &test_elements[i])); + grpc_alarm_heap_add(&pq, &test_elements[i]); + check_valid(&pq); + GPR_ASSERT(contains(&pq, &test_elements[i])); + inpq[i] = 1; + check_pq_top(test_elements, &pq, inpq, num_test_elements); + } + for (i = 0; i < num_test_elements; ++i) { + /* Test that check still succeeds even for element that wasn't just + inserted. */ + GPR_ASSERT(contains(&pq, &test_elements[i])); + } + + GPR_ASSERT(pq.alarm_count == num_test_elements); + + check_pq_top(test_elements, &pq, inpq, num_test_elements); + + for (i = 0; i < num_test_operations; ++i) { + int elem_num = rand() % num_test_elements; + grpc_alarm *el = &test_elements[elem_num]; + if (!inpq[elem_num]) { /* not in pq */ + GPR_ASSERT(!contains(&pq, el)); + el->deadline = random_deadline(); + grpc_alarm_heap_add(&pq, el); + GPR_ASSERT(contains(&pq, el)); + inpq[elem_num] = 1; + check_pq_top(test_elements, &pq, inpq, num_test_elements); + check_valid(&pq); + } else { + GPR_ASSERT(contains(&pq, el)); + grpc_alarm_heap_remove(&pq, el); + GPR_ASSERT(!contains(&pq, el)); + inpq[elem_num] = 0; + check_pq_top(test_elements, &pq, inpq, num_test_elements); + check_valid(&pq); + } + } + + grpc_alarm_heap_destroy(&pq); + gpr_free(test_elements); + gpr_free(inpq); +} + +static void shrink_test() { + grpc_alarm_heap pq; + int i; + int expected_size; + + /* A large random number to allow for multiple shrinkages, at least 512. */ + const int num_elements = rand() % 2000 + 512; + + grpc_alarm_heap_init(&pq); + + /* Create a priority queue with many elements. Make sure the Size() is + correct. */ + for (i = 0; i < num_elements; ++i) { + GPR_ASSERT(i == pq.alarm_count); + grpc_alarm_heap_add(&pq, create_test_elements(1)); + } + GPR_ASSERT(num_elements == pq.alarm_count); + + /* Remove elements until the Size is 1/4 the original size. */ + while (pq.alarm_count > num_elements / 4) { + grpc_alarm *const te = pq.alarms[pq.alarm_count - 1]; + grpc_alarm_heap_remove(&pq, te); + gpr_free(te); + } + GPR_ASSERT(num_elements / 4 == pq.alarm_count); + + /* Expect that Capacity is in the right range: + Size * 2 <= Capacity <= Size * 4 */ + GPR_ASSERT(pq.alarm_count * 2 <= pq.alarm_capacity); + GPR_ASSERT(pq.alarm_capacity <= pq.alarm_count * 4); + check_valid(&pq); + + /* Remove the rest of the elements. Check that the Capacity is not more than + 4 times the Size and not less than 2 times, but never goes below 16. */ + expected_size = pq.alarm_count; + while (pq.alarm_count > 0) { + const int which = rand() % pq.alarm_count; + grpc_alarm *te = pq.alarms[which]; + grpc_alarm_heap_remove(&pq, te); + gpr_free(te); + expected_size--; + GPR_ASSERT(expected_size == pq.alarm_count); + GPR_ASSERT(pq.alarm_count * 2 <= pq.alarm_capacity); + if (pq.alarm_count >= 8) { + GPR_ASSERT(pq.alarm_capacity <= pq.alarm_count * 4); + } else { + GPR_ASSERT(16 <= pq.alarm_capacity); + } + check_valid(&pq); + } + + GPR_ASSERT(0 == pq.alarm_count); + GPR_ASSERT(pq.alarm_capacity >= 16 && pq.alarm_capacity < 32); + + grpc_alarm_heap_destroy(&pq); +} + +int main(int argc, char **argv) { + int i; + + grpc_test_init(argc, argv); + + for (i = 0; i < 5; i++) { + test1(); + shrink_test(); + } + + return 0; +} diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c new file mode 100644 index 0000000000..a8aa6126e6 --- /dev/null +++ b/test/core/iomgr/alarm_list_test.c @@ -0,0 +1,144 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/alarm.h" + +#include <string.h> + +#include "src/core/iomgr/alarm_internal.h" +#include <grpc/support/log.h> +#include "test/core/util/test_config.h" + +#define MAX_CB 30 + +static int cb_called[MAX_CB][GRPC_CALLBACK_DO_NOT_USE]; +static int kicks; + +void grpc_kick_poller() { ++kicks; } + +static void cb(void *arg, grpc_iomgr_cb_status status) { + cb_called[(gpr_intptr)arg][status]++; +} + +static void add_test() { + gpr_timespec start = gpr_now(); + int i; + grpc_alarm alarms[20]; + + grpc_alarm_list_init(start); + memset(cb_called, 0, sizeof(cb_called)); + + /* 10 ms alarms. will expire in the current epoch */ + for (i = 0; i < 10; i++) { + grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(10)), + cb, (void *)(gpr_intptr)i, start); + } + + /* 1010 ms alarms. will expire in the next epoch */ + for (i = 10; i < 20; i++) { + grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(1010)), + cb, (void *)(gpr_intptr)i, start); + } + + /* collect alarms. Only the first batch should be ready. */ + GPR_ASSERT(10 == + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(500)))); + for (i = 0; i < 20; i++) { + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + } + + GPR_ASSERT(0 == + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(600)))); + for (i = 0; i < 30; i++) { + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 10)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + } + + /* collect the rest of the alarms */ + GPR_ASSERT(10 == + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1500)))); + for (i = 0; i < 30; i++) { + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + } + + GPR_ASSERT(0 == + grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(1600)))); + for (i = 0; i < 30; i++) { + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_SUCCESS] == (i < 20)); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_CANCELLED] == 0); + GPR_ASSERT(cb_called[i][GRPC_CALLBACK_TIMED_OUT] == 0); + } + + grpc_alarm_list_shutdown(); +} + +/* Cleaning up a list with pending alarms. */ +void destruction_test() { + grpc_alarm alarms[5]; + + grpc_alarm_list_init(gpr_time_0); + memset(cb_called, 0, sizeof(cb_called)); + + grpc_alarm_init(&alarms[0], gpr_time_from_millis(100), cb, + (void *)(gpr_intptr)0, gpr_time_0); + grpc_alarm_init(&alarms[1], gpr_time_from_millis(3), cb, + (void *)(gpr_intptr)1, gpr_time_0); + grpc_alarm_init(&alarms[2], gpr_time_from_millis(100), cb, + (void *)(gpr_intptr)2, gpr_time_0); + grpc_alarm_init(&alarms[3], gpr_time_from_millis(3), cb, + (void *)(gpr_intptr)3, gpr_time_0); + grpc_alarm_init(&alarms[4], gpr_time_from_millis(1), cb, + (void *)(gpr_intptr)4, gpr_time_0); + GPR_ASSERT(1 == grpc_alarm_check(gpr_time_from_millis(2))); + GPR_ASSERT(1 == cb_called[4][GRPC_CALLBACK_SUCCESS]); + grpc_alarm_cancel(&alarms[0]); + grpc_alarm_cancel(&alarms[3]); + GPR_ASSERT(1 == cb_called[0][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[3][GRPC_CALLBACK_CANCELLED]); + + grpc_alarm_list_shutdown(); + GPR_ASSERT(1 == cb_called[1][GRPC_CALLBACK_CANCELLED]); + GPR_ASSERT(1 == cb_called[2][GRPC_CALLBACK_CANCELLED]); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + add_test(); + destruction_test(); + return 0; +} diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index 801e87a6f7..271c42d57e 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -92,12 +92,9 @@ static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) { static void test_grpc_alarm() { grpc_alarm alarm; grpc_alarm alarm_to_cancel; - gpr_timespec tv0 = {0, 1}; /* Timeout on the alarm cond. var, so make big enough to absorb time deviations. Otherwise, operations after wait will not be properly ordered */ - gpr_timespec tv1 = gpr_time_from_micros(200000); - gpr_timespec tv2 = {0, 1}; gpr_timespec alarm_deadline; gpr_timespec followup_deadline; @@ -116,17 +113,20 @@ static void test_grpc_alarm() { gpr_cv_init(&arg.cv); gpr_event_init(&arg.fcb_arg); - grpc_alarm_init(&alarm, gpr_time_add(tv0, gpr_now()), alarm_cb, &arg, - gpr_now()); + grpc_alarm_init(&alarm, gpr_time_add(gpr_time_from_millis(100), gpr_now()), + alarm_cb, &arg, gpr_now()); - alarm_deadline = gpr_time_add(gpr_now(), tv1); + alarm_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(1)); gpr_mu_lock(&arg.mu); while (arg.done == 0) { - gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline); + if (gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline)) { + gpr_log(GPR_ERROR, "alarm deadline exceeded"); + break; + } } gpr_mu_unlock(&arg.mu); - followup_deadline = gpr_time_add(gpr_now(), tv1); + followup_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(5)); fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline); if (arg.counter != 1) { @@ -162,18 +162,21 @@ static void test_grpc_alarm() { gpr_cv_init(&arg2.cv); gpr_event_init(&arg2.fcb_arg); - grpc_alarm_init(&alarm_to_cancel, gpr_time_add(tv2, gpr_now()), alarm_cb, + grpc_alarm_init(&alarm_to_cancel, + gpr_time_add(gpr_time_from_millis(100), gpr_now()), alarm_cb, &arg2, gpr_now()); grpc_alarm_cancel(&alarm_to_cancel); - alarm_deadline = gpr_time_add(gpr_now(), tv1); + alarm_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(1)); gpr_mu_lock(&arg2.mu); while (arg2.done == 0) { gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline); } gpr_mu_unlock(&arg2.mu); - followup_deadline = gpr_time_add(gpr_now(), tv1); + gpr_log(GPR_INFO, "alarm done = %d", arg2.done); + + followup_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(5)); fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline); if (arg2.counter != arg2.done_success_ctr) { @@ -191,11 +194,11 @@ static void test_grpc_alarm() { } else if (arg2.done_success_ctr) { gpr_log(GPR_INFO, "Alarm callback executed before cancel"); gpr_log(GPR_INFO, "Current value of triggered is %d\n", - (int)alarm_to_cancel.triggered); + alarm_to_cancel.triggered); } else if (arg2.done_cancel_ctr) { gpr_log(GPR_INFO, "Alarm callback canceled"); gpr_log(GPR_INFO, "Current value of triggered is %d\n", - (int)alarm_to_cancel.triggered); + alarm_to_cancel.triggered); } else { gpr_log(GPR_ERROR, "Alarm cancel test should not be here"); GPR_ASSERT(0); |