diff options
author | Mark D. Roth <roth@google.com> | 2016-09-16 13:24:05 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-09-16 13:24:05 -0700 |
commit | 85102a9eb64ff71814dbace9a179146f8656e21b (patch) | |
tree | 847d7f20ed84d8da9943486db208febfb75b11a2 /test/core | |
parent | 5c28096ce3b6e5fe0564d836ba69347f5f97dc02 (diff) | |
parent | ee43d7bbb11a36c91c6f1ebff6bf3da70b7546b2 (diff) |
Merge remote-tracking branch 'upstream/master' into http_connect
Diffstat (limited to 'test/core')
-rw-r--r-- | test/core/iomgr/combiner_test.c | 164 | ||||
-rw-r--r-- | test/core/support/mpscq_test.c | 206 | ||||
-rw-r--r-- | test/core/surface/lame_client_test.c | 21 |
3 files changed, 380 insertions, 11 deletions
diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c new file mode 100644 index 0000000000..197998c1e5 --- /dev/null +++ b/test/core/iomgr/combiner_test.c @@ -0,0 +1,164 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/combiner.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +#include "test/core/util/test_config.h" + +static void test_no_op(void) { + gpr_log(GPR_DEBUG, "test_no_op"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL)); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value, + grpc_error *error) { + *(bool *)value = true; +} + +static void test_execute_one(void) { + gpr_log(GPR_DEBUG, "test_execute_one"); + + grpc_combiner *lock = grpc_combiner_create(NULL); + bool done = false; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_execute(&exec_ctx, lock, + grpc_closure_create(set_bool_to_true, &done), + GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); + GPR_ASSERT(done); + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); +} + +typedef struct { + size_t ctr; + grpc_combiner *lock; +} thd_args; + +typedef struct { + size_t *ctr; + size_t value; +} ex_args; + +static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) { + ex_args *args = a; + GPR_ASSERT(*args->ctr == args->value - 1); + *args->ctr = args->value; + gpr_free(a); +} + +static void execute_many_loop(void *a) { + thd_args *args = a; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + size_t n = 1; + for (size_t i = 0; i < 10; i++) { + for (size_t j = 0; j < 10000; j++) { + ex_args *c = gpr_malloc(sizeof(*c)); + c->ctr = &args->ctr; + c->value = n++; + grpc_combiner_execute(&exec_ctx, args->lock, + grpc_closure_create(check_one, c), GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); + } + // sleep for a little bit, to test a combiner draining and another thread + // picking it up + gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100)); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_execute_many(void) { + gpr_log(GPR_DEBUG, "test_execute_many"); + + grpc_combiner *lock = grpc_combiner_create(NULL); + gpr_thd_id thds[100]; + thd_args ta[GPR_ARRAY_SIZE(thds)]; + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + ta[i].ctr = 0; + ta[i].lock = lock; + GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options)); + } + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_join(thds[i]); + } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); +} + +static bool got_in_finally = false; + +static void in_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + got_in_finally = true; +} + +static void add_finally(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_combiner_execute_finally(exec_ctx, arg, + grpc_closure_create(in_finally, NULL), + GRPC_ERROR_NONE, false); +} + +static void test_execute_finally(void) { + gpr_log(GPR_DEBUG, "test_execute_finally"); + + grpc_combiner *lock = grpc_combiner_create(NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock), + GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); + GPR_ASSERT(got_in_finally); + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + test_no_op(); + test_execute_one(); + test_execute_finally(); + test_execute_many(); + grpc_shutdown(); + + return 0; +} diff --git a/test/core/support/mpscq_test.c b/test/core/support/mpscq_test.c new file mode 100644 index 0000000000..491eb9148b --- /dev/null +++ b/test/core/support/mpscq_test.c @@ -0,0 +1,206 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/mpscq.h" + +#include <stdlib.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> +#include "test/core/util/test_config.h" + +typedef struct test_node { + gpr_mpscq_node node; + size_t i; + size_t *ctr; +} test_node; + +static test_node *new_node(size_t i, size_t *ctr) { + test_node *n = gpr_malloc(sizeof(test_node)); + n->i = i; + n->ctr = ctr; + return n; +} + +static void test_serial(void) { + gpr_log(GPR_DEBUG, "test_serial"); + gpr_mpscq q; + gpr_mpscq_init(&q); + for (size_t i = 0; i < 10000000; i++) { + gpr_mpscq_push(&q, &new_node(i, NULL)->node); + } + for (size_t i = 0; i < 10000000; i++) { + test_node *n = (test_node *)gpr_mpscq_pop(&q); + GPR_ASSERT(n); + GPR_ASSERT(n->i == i); + gpr_free(n); + } +} + +typedef struct { + size_t ctr; + gpr_mpscq *q; + gpr_event *start; +} thd_args; + +#define THREAD_ITERATIONS 100000 + +static void test_thread(void *args) { + thd_args *a = args; + gpr_event_wait(a->start, gpr_inf_future(GPR_CLOCK_REALTIME)); + for (size_t i = 1; i <= THREAD_ITERATIONS; i++) { + gpr_mpscq_push(a->q, &new_node(i, &a->ctr)->node); + } +} + +static void test_mt(void) { + gpr_log(GPR_DEBUG, "test_mt"); + gpr_event start; + gpr_event_init(&start); + gpr_thd_id thds[100]; + thd_args ta[GPR_ARRAY_SIZE(thds)]; + gpr_mpscq q; + gpr_mpscq_init(&q); + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + ta[i].ctr = 0; + ta[i].q = &q; + ta[i].start = &start; + GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options)); + } + size_t num_done = 0; + size_t spins = 0; + gpr_event_set(&start, (void *)1); + while (num_done != GPR_ARRAY_SIZE(thds)) { + gpr_mpscq_node *n; + while ((n = gpr_mpscq_pop(&q)) == NULL) { + spins++; + } + test_node *tn = (test_node *)n; + GPR_ASSERT(*tn->ctr == tn->i - 1); + *tn->ctr = tn->i; + if (tn->i == THREAD_ITERATIONS) num_done++; + gpr_free(tn); + } + gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, spins); + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_join(thds[i]); + } + gpr_mpscq_destroy(&q); +} + +typedef struct { + thd_args *ta; + size_t num_thds; + gpr_mu mu; + size_t num_done; + size_t spins; + gpr_mpscq *q; + gpr_event *start; +} pull_args; + +static void pull_thread(void *arg) { + pull_args *pa = arg; + gpr_event_wait(pa->start, gpr_inf_future(GPR_CLOCK_REALTIME)); + + for (;;) { + gpr_mu_lock(&pa->mu); + if (pa->num_done == pa->num_thds) { + gpr_mu_unlock(&pa->mu); + return; + } + gpr_mpscq_node *n; + while ((n = gpr_mpscq_pop(pa->q)) == NULL) { + pa->spins++; + } + test_node *tn = (test_node *)n; + GPR_ASSERT(*tn->ctr == tn->i - 1); + *tn->ctr = tn->i; + if (tn->i == THREAD_ITERATIONS) pa->num_done++; + gpr_free(tn); + gpr_mu_unlock(&pa->mu); + } +} + +static void test_mt_multipop(void) { + gpr_log(GPR_DEBUG, "test_mt_multipop"); + gpr_event start; + gpr_event_init(&start); + gpr_thd_id thds[100]; + gpr_thd_id pull_thds[100]; + thd_args ta[GPR_ARRAY_SIZE(thds)]; + gpr_mpscq q; + gpr_mpscq_init(&q); + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + ta[i].ctr = 0; + ta[i].q = &q; + ta[i].start = &start; + GPR_ASSERT(gpr_thd_new(&thds[i], test_thread, &ta[i], &options)); + } + pull_args pa; + pa.ta = ta; + pa.num_thds = GPR_ARRAY_SIZE(thds); + pa.spins = 0; + pa.num_done = 0; + pa.q = &q; + pa.start = &start; + gpr_mu_init(&pa.mu); + for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + GPR_ASSERT(gpr_thd_new(&pull_thds[i], pull_thread, &pa, &options)); + } + gpr_event_set(&start, (void *)1); + for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) { + gpr_thd_join(pull_thds[i]); + } + gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins); + for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { + gpr_thd_join(thds[i]); + } + gpr_mpscq_destroy(&q); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + test_serial(); + test_mt(); + test_mt_multipop(); + return 0; +} diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index 2894b0c66f..6afcefca92 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -49,32 +49,31 @@ static void *tag(intptr_t x) { return (void *)x; } void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_transport_op *op = arg; - GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == *op->connectivity_state); + grpc_connectivity_state *state = arg; + GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == *state); GPR_ASSERT(error == GRPC_ERROR_NONE); } void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} void test_transport_op(grpc_channel *channel) { - grpc_transport_op op; + grpc_transport_op *op; grpc_channel_element *elem; grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - memset(&op, 0, sizeof(op)); - grpc_closure_init(&transport_op_cb, verify_connectivity, &op); + grpc_closure_init(&transport_op_cb, verify_connectivity, &state); - op.on_connectivity_state_change = &transport_op_cb; - op.connectivity_state = &state; + op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &transport_op_cb; + op->connectivity_state = &state; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, &op); + elem->filter->start_transport_op(&exec_ctx, elem, op); grpc_exec_ctx_finish(&exec_ctx); - memset(&op, 0, sizeof(op)); grpc_closure_init(&transport_op_cb, do_nothing, NULL); - op.on_consumed = &transport_op_cb; - elem->filter->start_transport_op(&exec_ctx, elem, &op); + op = grpc_make_transport_op(&transport_op_cb); + elem->filter->start_transport_op(&exec_ctx, elem, op); grpc_exec_ctx_finish(&exec_ctx); } |