diff options
author | 2018-03-02 22:09:52 -0800 | |
---|---|---|
committer | 2018-03-02 22:09:52 -0800 | |
commit | 94dad67f4386ca1d46d96fd255cf4a9df5c087b2 (patch) | |
tree | 43e29b584e7d8c062d91eee5d0fc50639c1643e4 /test/core/surface | |
parent | c1284576f81efc4b2d9c33c4337d8c9fbafc29de (diff) | |
parent | 6eae794c9f8f305a51274a79400124093ddc9354 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into channel-tracing
Diffstat (limited to 'test/core/surface')
5 files changed, 100 insertions, 91 deletions
diff --git a/test/core/surface/byte_buffer_reader_test.cc b/test/core/surface/byte_buffer_reader_test.cc index 861ed5d1a8..cff05caec1 100644 --- a/test/core/surface/byte_buffer_reader_test.cc +++ b/test/core/surface/byte_buffer_reader_test.cc @@ -26,7 +26,7 @@ #include <grpc/support/time.h> #include "src/core/lib/compression/message_compress.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" diff --git a/test/core/surface/completion_queue_threading_test.cc b/test/core/surface/completion_queue_threading_test.cc index 81319f4df4..9c8d8d8395 100644 --- a/test/core/surface/completion_queue_threading_test.cc +++ b/test/core/surface/completion_queue_threading_test.cc @@ -22,8 +22,8 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "src/core/lib/gpr/thd.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" @@ -78,16 +78,14 @@ static void test_too_many_plucks(void) { grpc_completion_queue* cc; void* tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; - gpr_thd_id thread_ids[GPR_ARRAY_SIZE(tags)]; + grpc_core::Thread threads[GPR_ARRAY_SIZE(tags)]; struct thread_state thread_states[GPR_ARRAY_SIZE(tags)]; - gpr_thd_options thread_options = gpr_thd_options_default(); grpc_core::ExecCtx exec_ctx; unsigned i, j; LOG_TEST("test_too_many_plucks"); cc = grpc_completion_queue_create_for_pluck(nullptr); - gpr_thd_options_set_joinable(&thread_options); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { tags[i] = create_test_tag(); @@ -96,8 +94,9 @@ static void test_too_many_plucks(void) { } thread_states[i].cc = cc; thread_states[i].tag = tags[i]; - gpr_thd_new(thread_ids + i, "grpc_pluck_test", pluck_one, thread_states + i, - &thread_options); + threads[i] = + grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i); + threads[i].Start(); } /* wait until all other threads are plucking */ @@ -113,8 +112,8 @@ static void test_too_many_plucks(void) { nullptr, &completions[i]); } - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - gpr_thd_join(thread_ids[i]); + for (auto& th : threads) { + th.Join(); } shutdown_and_destroy(cc); @@ -220,8 +219,9 @@ static void test_threading(size_t producers, size_t consumers) { "test_threading", producers, consumers); /* start all threads: they will wait for phase1 */ + grpc_core::Thread* threads = reinterpret_cast<grpc_core::Thread*>( + gpr_malloc(sizeof(*threads) * (producers + consumers))); for (i = 0; i < producers + consumers; i++) { - gpr_thd_id id; gpr_event_init(&options[i].on_started); gpr_event_init(&options[i].on_phase1_done); gpr_event_init(&options[i].on_finished); @@ -230,10 +230,13 @@ static void test_threading(size_t producers, size_t consumers) { options[i].events_triggered = 0; options[i].cc = cc; options[i].id = optid++; - GPR_ASSERT(gpr_thd_new(&id, - i < producers ? "grpc_producer" : "grpc_consumer", - i < producers ? producer_thread : consumer_thread, - options + i, nullptr)); + + bool ok; + threads[i] = grpc_core::Thread( + i < producers ? "grpc_producer" : "grpc_consumer", + i < producers ? producer_thread : consumer_thread, options + i, &ok); + GPR_ASSERT(ok); + threads[i].Start(); gpr_event_wait(&options[i].on_started, ten_seconds_time()); } @@ -266,6 +269,11 @@ static void test_threading(size_t producers, size_t consumers) { /* destroy the completion channel */ grpc_completion_queue_destroy(cc); + for (i = 0; i < producers + consumers; i++) { + threads[i].Join(); + } + gpr_free(threads); + /* verify that everything was produced and consumed */ for (i = 0; i < producers + consumers; i++) { if (i < producers) { diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 95af4abd48..c1298b6636 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -30,7 +30,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -172,73 +172,77 @@ int run_concurrent_connectivity_test() { grpc_init(); - gpr_thd_id threads[NUM_THREADS]; - gpr_thd_id server; - - char* localhost = gpr_strdup("localhost:54321"); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - /* First round, no server */ - gpr_log(GPR_DEBUG, "Wave 1"); - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_1", create_loop_destroy, localhost, - &options); - } - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_join(threads[i]); + { + gpr_log(GPR_DEBUG, "Wave 1"); + char* localhost = gpr_strdup("localhost:54321"); + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } + gpr_free(localhost); } - gpr_free(localhost); - /* Second round, actual grpc server */ - gpr_log(GPR_DEBUG, "Wave 2"); - int port = grpc_pick_unused_port_or_die(); - gpr_asprintf(&args.addr, "localhost:%d", port); - args.server = grpc_server_create(nullptr, nullptr); - grpc_server_add_insecure_http2_port(args.server, args.addr); - args.cq = grpc_completion_queue_create_for_next(nullptr); - grpc_server_register_completion_queue(args.server, args.cq, nullptr); - grpc_server_start(args.server); - gpr_thd_new(&server, "grpc_wave_2_server", server_thread, &args, &options); - - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_2", create_loop_destroy, args.addr, - &options); - } - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_join(threads[i]); - } - grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); - - gpr_thd_join(server); - grpc_server_destroy(args.server); - grpc_completion_queue_destroy(args.cq); - gpr_free(args.addr); - - /* Third round, bogus tcp server */ - gpr_log(GPR_DEBUG, "Wave 3"); - args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(args.pollset, &args.mu); - gpr_event_init(&args.ready); - gpr_thd_new(&server, "grpc_wave_3_server", bad_server_thread, &args, - &options); - gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_wave_3", create_loop_destroy, args.addr, - &options); - } - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_join(threads[i]); + { + /* Second round, actual grpc server */ + gpr_log(GPR_DEBUG, "Wave 2"); + int port = grpc_pick_unused_port_or_die(); + gpr_asprintf(&args.addr, "localhost:%d", port); + args.server = grpc_server_create(nullptr, nullptr); + grpc_server_add_insecure_http2_port(args.server, args.addr); + args.cq = grpc_completion_queue_create_for_next(nullptr); + grpc_server_register_completion_queue(args.server, args.cq, nullptr); + grpc_server_start(args.server); + grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args); + server2.Start(); + + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } + grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); + + server2.Join(); + grpc_server_destroy(args.server); + grpc_completion_queue_destroy(args.cq); + gpr_free(args.addr); } - gpr_atm_rel_store(&args.stop, 1); - gpr_thd_join(server); { - grpc_core::ExecCtx exec_ctx; - grpc_pollset_shutdown( - args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset, - grpc_schedule_on_exec_ctx)); + /* Third round, bogus tcp server */ + gpr_log(GPR_DEBUG, "Wave 3"); + args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(args.pollset, &args.mu); + gpr_event_init(&args.ready); + grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args); + server3.Start(); + gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + + grpc_core::Thread threads[NUM_THREADS]; + for (auto& th : threads) { + th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); + th.Start(); + } + for (auto& th : threads) { + th.Join(); + } + + gpr_atm_rel_store(&args.stop, 1); + server3.Join(); + { + grpc_core::ExecCtx exec_ctx; + grpc_pollset_shutdown( + args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset, + grpc_schedule_on_exec_ctx)); + } } grpc_shutdown(); @@ -278,18 +282,17 @@ void watches_with_short_timeouts(void* addr) { int run_concurrent_watches_with_short_timeouts_test() { grpc_init(); - gpr_thd_id threads[NUM_THREADS]; + grpc_core::Thread threads[NUM_THREADS]; char* localhost = gpr_strdup("localhost:54321"); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_new(&threads[i], "grpc_short_watches", watches_with_short_timeouts, - localhost, &options); + for (auto& th : threads) { + th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts, + localhost); + th.Start(); } - for (size_t i = 0; i < NUM_THREADS; ++i) { - gpr_thd_join(threads[i]); + for (auto& th : threads) { + th.Join(); } gpr_free(localhost); diff --git a/test/core/surface/num_external_connectivity_watchers_test.cc b/test/core/surface/num_external_connectivity_watchers_test.cc index 49d28ad1c7..467deeeaec 100644 --- a/test/core/surface/num_external_connectivity_watchers_test.cc +++ b/test/core/surface/num_external_connectivity_watchers_test.cc @@ -23,7 +23,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" diff --git a/test/core/surface/sequential_connectivity_test.cc b/test/core/surface/sequential_connectivity_test.cc index 428d17ff1b..9aba4c499e 100644 --- a/test/core/surface/sequential_connectivity_test.cc +++ b/test/core/surface/sequential_connectivity_test.cc @@ -23,7 +23,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" -#include "src/core/lib/gpr/thd.h" +#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" @@ -67,10 +67,8 @@ static void run_test(const test_fixture* fixture) { grpc_server_start(server); server_thread_args sta = {server, server_cq}; - gpr_thd_id server_thread; - gpr_thd_options thdopt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&thdopt); - gpr_thd_new(&server_thread, "grpc_server", server_thread_func, &sta, &thdopt); + grpc_core::Thread server_thread("grpc_server", server_thread_func, &sta); + server_thread.Start(); grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_channel* channels[NUM_CONNECTIONS]; @@ -95,7 +93,7 @@ static void run_test(const test_fixture* fixture) { } grpc_server_shutdown_and_notify(server, server_cq, nullptr); - gpr_thd_join(server_thread); + server_thread.Join(); grpc_completion_queue_shutdown(server_cq); grpc_completion_queue_shutdown(cq); |