diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-21 17:07:36 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-21 17:07:36 -0700 |
commit | 7beea147e506b57a9656976001a1360e755a2e9b (patch) | |
tree | 57d098e53d213dfe21e1bf40b7693ff2936901d4 /test/core/surface/concurrent_connectivity_test.c | |
parent | 788a25365d022d9797de444799c57842a6fa0603 (diff) | |
parent | 5a56891eb6b1591e97d36072ac067d762aecf07e (diff) |
Merge github.com:grpc/grpc into strong-includes
Diffstat (limited to 'test/core/surface/concurrent_connectivity_test.c')
-rw-r--r-- | test/core/surface/concurrent_connectivity_test.c | 148 |
1 files changed, 141 insertions, 7 deletions
diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 96761b0502..a2fdf73596 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -31,12 +31,21 @@ * */ +#include <memory.h> #include <stdio.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_server.h" + +#include "test/core/util/port.h" #include "test/core/util/test_config.h" #define NUM_THREADS 100 @@ -45,10 +54,13 @@ #define DELAY_MILLIS 10 #define POLL_MILLIS 3000 -void create_loop_destroy(void* unused) { +static void *tag(int n) { return (void *)(uintptr_t)n; } +static int detag(void *p) { return (int)(uintptr_t)p; } + +void create_loop_destroy(void *addr) { for (int i = 0; i < NUM_OUTER_LOOPS; ++i) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); - grpc_channel* chan = grpc_insecure_channel_create("localhost", NULL, NULL); + grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_channel *chan = grpc_insecure_channel_create((char *)addr, NULL, NULL); for (int j = 0; j < NUM_INNER_LOOPS; ++j) { gpr_timespec later_time = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(DELAY_MILLIS); @@ -64,18 +76,140 @@ void create_loop_destroy(void* unused) { } } -int main(int argc, char** argv) { +struct server_thread_args { + char *addr; + grpc_server *server; + grpc_completion_queue *cq; + grpc_pollset *pollset; + gpr_mu *mu; + gpr_event ready; + gpr_atm stop; +}; + +void server_thread(void *vargs) { + struct server_thread_args *args = (struct server_thread_args *)vargs; + grpc_event ev; + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + ev = grpc_completion_queue_next(args->cq, deadline, NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(detag(ev.tag) == 0xd1e); +} + +static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, + grpc_tcp_server_acceptor *acceptor) { + struct server_thread_args *args = (struct server_thread_args *)vargs; + (void)acceptor; + grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_destroy(exec_ctx, tcp); + grpc_pollset_kick(args->pollset, NULL); +} + +void bad_server_thread(void *vargs) { + struct server_thread_args *args = (struct server_thread_args *)vargs; + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + struct sockaddr_storage addr; + socklen_t addr_len = sizeof(addr); + int port; + grpc_tcp_server *s = grpc_tcp_server_create(NULL); + memset(&addr, 0, sizeof(addr)); + addr.ss_family = AF_INET; + port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len); + GPR_ASSERT(port > 0); + gpr_asprintf(&args->addr, "localhost:%d", port); + + grpc_tcp_server_start(&exec_ctx, s, &args->pollset, 1, on_connect, args); + gpr_event_set(&args->ready, (void *)1); + + gpr_mu_lock(args->mu); + while (gpr_atm_acq_load(&args->stop) == 0) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_millis(100, GPR_TIMESPAN)); + + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, args->pollset, &worker, now, deadline); + gpr_mu_unlock(args->mu); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(args->mu); + } + gpr_mu_unlock(args->mu); + + grpc_tcp_server_unref(&exec_ctx, s); + + grpc_exec_ctx_finish(&exec_ctx); + + gpr_free(args->addr); +} + +int main(int argc, char **argv) { + struct server_thread_args args; + memset(&args, 0, sizeof(args)); + grpc_test_init(argc, argv); grpc_init(); + gpr_thd_id threads[NUM_THREADS]; + gpr_thd_id server; + + char *localhost = gpr_strdup("localhost:54321"); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + + /* First round, no server */ + gpr_log(GPR_DEBUG, "Wave 1"); + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_new(&threads[i], create_loop_destroy, localhost, &options); + } + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_join(threads[i]); + } + gpr_free(localhost); + + /* Second round, actual grpc server */ + gpr_log(GPR_DEBUG, "Wave 2"); + int port = grpc_pick_unused_port_or_die(); + gpr_asprintf(&args.addr, "localhost:%d", port); + args.server = grpc_server_create(NULL, NULL); + grpc_server_add_insecure_http2_port(args.server, args.addr); + args.cq = grpc_completion_queue_create(NULL); + grpc_server_register_completion_queue(args.server, args.cq, NULL); + grpc_server_start(args.server); + gpr_thd_new(&server, server_thread, &args, &options); + for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&threads[i], create_loop_destroy, NULL, &options); + gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options); } for (size_t i = 0; i < NUM_THREADS; ++i) { gpr_thd_join(threads[i]); } + grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); + + gpr_thd_join(server); + grpc_server_destroy(args.server); + grpc_completion_queue_destroy(args.cq); + gpr_free(args.addr); + + /* Third round, bogus tcp server */ + gpr_log(GPR_DEBUG, "Wave 3"); + args.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(args.pollset, &args.mu); + gpr_event_init(&args.ready); + gpr_thd_new(&server, bad_server_thread, &args, &options); + gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_new(&threads[i], create_loop_destroy, args.addr, &options); + } + for (size_t i = 0; i < NUM_THREADS; ++i) { + gpr_thd_join(threads[i]); + } + + gpr_atm_rel_store(&args.stop, 1); + gpr_thd_join(server); + grpc_pollset_destroy(args.pollset); + gpr_free(args.pollset); + grpc_shutdown(); return 0; } |