From ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 6 Dec 2017 09:05:05 -0800 Subject: Revert "All instances of exec_ctx being passed around in src/core removed" --- test/core/iomgr/combiner_test.cc | 51 ++-- test/core/iomgr/endpoint_pair_test.cc | 29 ++- test/core/iomgr/endpoint_tests.cc | 99 ++++--- test/core/iomgr/ev_epollsig_linux_test.cc | 109 ++++---- test/core/iomgr/fd_conservation_posix_test.cc | 37 ++- test/core/iomgr/fd_posix_test.cc | 156 ++++++------ test/core/iomgr/load_file_test.cc | 3 - test/core/iomgr/pollset_set_test.cc | 245 +++++++++--------- test/core/iomgr/resolve_address_posix_test.cc | 65 ++--- test/core/iomgr/resolve_address_test.cc | 153 +++++------ test/core/iomgr/resource_quota_test.cc | 354 ++++++++++++++------------ test/core/iomgr/tcp_client_posix_test.cc | 83 +++--- test/core/iomgr/tcp_client_uv_test.cc | 62 +++-- test/core/iomgr/tcp_posix_test.cc | 153 ++++++----- test/core/iomgr/tcp_server_posix_test.cc | 183 ++++++------- test/core/iomgr/tcp_server_uv_test.cc | 90 ++++--- test/core/iomgr/timer_list_test.cc | 75 +++--- test/core/iomgr/udp_server_test.cc | 105 ++++---- 18 files changed, 1104 insertions(+), 948 deletions(-) (limited to 'test/core/iomgr') diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index 891008c774..33d892fa06 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -28,11 +28,13 @@ static void test_no_op(void) { gpr_log(GPR_DEBUG, "test_no_op"); - grpc_core::ExecCtx exec_ctx; - GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, grpc_combiner_create(), "test_no_op"); + grpc_exec_ctx_finish(&exec_ctx); } -static void set_event_to_true(void* value, grpc_error* error) { +static void set_event_to_true(grpc_exec_ctx* exec_ctx, void* value, + grpc_error* error) { gpr_event_set(static_cast(value), (void*)1); } @@ -42,14 +44,16 @@ static void test_execute_one(void) { grpc_combiner* lock = grpc_combiner_create(); gpr_event done; gpr_event_init(&done); - grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done, + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_SCHED(&exec_ctx, + GRPC_CLOSURE_CREATE(set_event_to_true, &done, grpc_combiner_scheduler(lock)), GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != nullptr); - GRPC_COMBINER_UNREF(lock, "test_execute_one"); + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_one"); + grpc_exec_ctx_finish(&exec_ctx); } typedef struct { @@ -63,7 +67,7 @@ typedef struct { size_t value; } ex_args; -static void check_one(void* a, grpc_error* error) { +static void check_one(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) { ex_args* args = static_cast(a); GPR_ASSERT(*args->ctr == args->value - 1); *args->ctr = args->value; @@ -72,25 +76,28 @@ static void check_one(void* a, grpc_error* error) { static void execute_many_loop(void* a) { thd_args* args = static_cast(a); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 1; for (size_t i = 0; i < 10; i++) { for (size_t j = 0; j < 10000; j++) { ex_args* c = static_cast(gpr_malloc(sizeof(*c))); c->ctr = &args->ctr; c->value = n++; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE( + GRPC_CLOSURE_SCHED(&exec_ctx, + GRPC_CLOSURE_CREATE( check_one, c, grpc_combiner_scheduler(args->lock)), GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } // sleep for a little bit, to test a combiner draining and another thread // picking it up gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); } - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, + GRPC_CLOSURE_SCHED(&exec_ctx, + GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, grpc_combiner_scheduler(args->lock)), GRPC_ERROR_NONE); + grpc_exec_ctx_finish(&exec_ctx); } static void test_execute_many(void) { @@ -113,18 +120,20 @@ static void test_execute_many(void) { gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr); gpr_thd_join(thds[i]); } - grpc_core::ExecCtx exec_ctx; - GRPC_COMBINER_UNREF(lock, "test_execute_many"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_many"); + grpc_exec_ctx_finish(&exec_ctx); } static gpr_event got_in_finally; -static void in_finally(void* arg, grpc_error* error) { +static void in_finally(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { gpr_event_set(&got_in_finally, (void*)1); } -static void add_finally(void* arg, grpc_error* error) { - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg, +static void add_finally(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(in_finally, arg, grpc_combiner_finally_scheduler( static_cast(arg))), GRPC_ERROR_NONE); @@ -134,15 +143,17 @@ static void test_execute_finally(void) { gpr_log(GPR_DEBUG, "test_execute_finally"); grpc_combiner* lock = grpc_combiner_create(); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_event_init(&got_in_finally); GRPC_CLOSURE_SCHED( + &exec_ctx, GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)), GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(gpr_event_wait(&got_in_finally, grpc_timeout_seconds_to_deadline(5)) != nullptr); - GRPC_COMBINER_UNREF(lock, "test_execute_finally"); + GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_finally"); + grpc_exec_ctx_finish(&exec_ctx); } int main(int argc, char** argv) { diff --git a/test/core/iomgr/endpoint_pair_test.cc b/test/core/iomgr/endpoint_pair_test.cc index 90dd40d9c4..30a0cb5924 100644 --- a/test/core/iomgr/endpoint_pair_test.cc +++ b/test/core/iomgr/endpoint_pair_test.cc @@ -32,7 +32,7 @@ static void clean_up(void) {} static grpc_endpoint_test_fixture create_fixture_endpoint_pair( size_t slice_size) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_test_fixture f; grpc_arg a[1]; a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE); @@ -43,8 +43,9 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair( f.client_ep = p.client; f.server_ep = p.server; - grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); + grpc_exec_ctx_finish(&exec_ctx); return f; } @@ -53,23 +54,23 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up}, }; -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - } + g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index 8ccae52067..026e34105d 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -115,7 +115,8 @@ struct read_and_write_test_state { grpc_closure done_write; }; -static void read_and_write_test_read_handler(void* data, grpc_error* error) { +static void read_and_write_test_read_handler(grpc_exec_ctx* exec_ctx, + void* data, grpc_error* error) { struct read_and_write_test_state* state = (struct read_and_write_test_state*)data; @@ -125,14 +126,17 @@ static void read_and_write_test_read_handler(void* data, grpc_error* error) { gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(g_mu); state->read_done = 1 + (error == GRPC_ERROR_NONE); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, g_pollset, nullptr)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { - grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read); + grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, + &state->done_read); } } -static void read_and_write_test_write_handler(void* data, grpc_error* error) { +static void read_and_write_test_write_handler(grpc_exec_ctx* exec_ctx, + void* data, grpc_error* error) { struct read_and_write_test_state* state = (struct read_and_write_test_state*)data; grpc_slice* slices = nullptr; @@ -149,7 +153,7 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) { &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(state->write_ep, &state->outgoing, + grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, &state->done_write); gpr_free(slices); return; @@ -159,7 +163,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) { gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(g_mu); state->write_done = 1 + (error == GRPC_ERROR_NONE); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, g_pollset, nullptr)); gpr_mu_unlock(g_mu); } @@ -173,7 +178,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, struct read_and_write_test_state state; grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); gpr_log(GPR_DEBUG, @@ -212,57 +217,66 @@ static void read_and_write_test(grpc_endpoint_test_config config, for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); + read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE); + grpc_exec_ctx_flush(&exec_ctx); - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read); + grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, + &state.done_read); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); grpc_endpoint_shutdown( - state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); + &exec_ctx, state.read_ep, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); gpr_log(GPR_DEBUG, "shutdown write"); grpc_endpoint_shutdown( - state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); + &exec_ctx, state.write_ep, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); } - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { grpc_pollset_worker* worker = nullptr; - GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline); + GPR_ASSERT(grpc_exec_ctx_now(&exec_ctx) < deadline); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); } gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); end_test(config); - grpc_slice_buffer_destroy_internal(&state.outgoing); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_endpoint_destroy(state.read_ep); - grpc_endpoint_destroy(state.write_ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.outgoing); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_endpoint_destroy(&exec_ctx, state.read_ep); + grpc_endpoint_destroy(&exec_ctx, state.write_ep); + grpc_exec_ctx_finish(&exec_ctx); } -static void inc_on_failure(void* arg, grpc_error* error) { +static void inc_on_failure(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { gpr_mu_lock(g_mu); *(int*)arg += (error != GRPC_ERROR_NONE); - GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } -static void wait_for_fail_count(int* fail_count, int want_fail_count) { - grpc_core::ExecCtx::Get()->Flush(); +static void wait_for_fail_count(grpc_exec_ctx* exec_ctx, int* fail_count, + int want_fail_count) { + grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(g_mu); grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); - while (grpc_core::ExecCtx::Get()->Now() < deadline && + while (grpc_exec_ctx_now(exec_ctx) < deadline && *fail_count < want_fail_count) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(*fail_count == want_fail_count); @@ -277,32 +291,33 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_slice_buffer slice_buffer; grpc_slice_buffer_init(&slice_buffer); - grpc_core::ExecCtx exec_ctx; - grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); - grpc_endpoint_read(f.client_ep, &slice_buffer, + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); - wait_for_fail_count(&fail_count, 0); - grpc_endpoint_shutdown(f.client_ep, + wait_for_fail_count(&exec_ctx, &fail_count, 0); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); - wait_for_fail_count(&fail_count, 1); - grpc_endpoint_read(f.client_ep, &slice_buffer, + wait_for_fail_count(&exec_ctx, &fail_count, 1); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); - wait_for_fail_count(&fail_count, 2); + wait_for_fail_count(&exec_ctx, &fail_count, 2); grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); - grpc_endpoint_write(f.client_ep, &slice_buffer, + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); - wait_for_fail_count(&fail_count, 3); - grpc_endpoint_shutdown(f.client_ep, + wait_for_fail_count(&exec_ctx, &fail_count, 3); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); - wait_for_fail_count(&fail_count, 3); + wait_for_fail_count(&exec_ctx, &fail_count, 3); - grpc_slice_buffer_destroy_internal(&slice_buffer); + grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer); - grpc_endpoint_destroy(f.client_ep); - grpc_endpoint_destroy(f.server_ep); + grpc_endpoint_destroy(&exec_ctx, f.client_ep); + grpc_endpoint_destroy(&exec_ctx, f.server_ep); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_endpoint_tests(grpc_endpoint_test_config config, diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index e767e01f21..94f387164a 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -70,18 +70,19 @@ static void test_fd_init(test_fd* tfds, int* fds, int num_fds) { } } -static void test_fd_cleanup(test_fd* tfds, int num_fds) { +static void test_fd_cleanup(grpc_exec_ctx* exec_ctx, test_fd* tfds, + int num_fds) { int release_fd; int i; for (i = 0; i < num_fds; i++) { - grpc_fd_shutdown(tfds[i].fd, + grpc_fd_shutdown(exec_ctx, tfds[i].fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup")); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); - grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, - "test_fd_cleanup"); - grpc_core::ExecCtx::Get()->Flush(); + grpc_fd_orphan(exec_ctx, tfds[i].fd, nullptr, &release_fd, + false /* already_closed */, "test_fd_cleanup"); + grpc_exec_ctx_flush(exec_ctx); GPR_ASSERT(release_fd == tfds[i].inner_fd); close(tfds[i].inner_fd); @@ -97,20 +98,22 @@ static void test_pollset_init(test_pollset* pollsets, int num_pollsets) { } } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy((grpc_pollset*)p); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p); } -static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) { +static void test_pollset_cleanup(grpc_exec_ctx* exec_ctx, + test_pollset* pollsets, int num_pollsets) { grpc_closure destroyed; int i; for (i = 0; i < num_pollsets; i++) { GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].pollset, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(pollsets[i].pollset, &destroyed); + grpc_pollset_shutdown(exec_ctx, pollsets[i].pollset, &destroyed); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); gpr_free(pollsets[i].pollset); } } @@ -130,7 +133,7 @@ static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) { #define NUM_POLLSETS 4 static void test_add_fd_to_pollset() { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; test_fd tfds[NUM_FDS]; int fds[NUM_FDS]; test_pollset pollsets[NUM_POLLSETS]; @@ -167,33 +170,33 @@ static void test_add_fd_to_pollset() { /* == Step 1 == */ for (i = 0; i <= 2; i++) { - grpc_pollset_add_fd(pollsets[0].pollset, tfds[i].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[0].pollset, tfds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); } for (i = 3; i <= 4; i++) { - grpc_pollset_add_fd(pollsets[1].pollset, tfds[i].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, tfds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); } for (i = 5; i <= 7; i++) { - grpc_pollset_add_fd(pollsets[2].pollset, tfds[i].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, tfds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); } /* == Step 2 == */ for (i = 0; i <= 1; i++) { - grpc_pollset_add_fd(pollsets[3].pollset, tfds[i].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[3].pollset, tfds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); } /* == Step 3 == */ - grpc_pollset_add_fd(pollsets[1].pollset, tfds[0].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, tfds[0].fd); + grpc_exec_ctx_flush(&exec_ctx); /* == Step 4 == */ - grpc_pollset_add_fd(pollsets[2].pollset, tfds[3].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, tfds[3].fd); + grpc_exec_ctx_flush(&exec_ctx); /* All polling islands are merged at this point */ @@ -210,8 +213,9 @@ static void test_add_fd_to_pollset() { expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset))); } - test_fd_cleanup(tfds, NUM_FDS); - test_pollset_cleanup(pollsets, NUM_POLLSETS); + test_fd_cleanup(&exec_ctx, tfds, NUM_FDS); + test_pollset_cleanup(&exec_ctx, pollsets, NUM_POLLSETS); + grpc_exec_ctx_finish(&exec_ctx); } #undef NUM_FDS @@ -231,24 +235,26 @@ static __thread int thread_wakeups = 0; static void test_threading_loop(void* arg) { threading_shared* shared = static_cast(arg); while (thread_wakeups < 1000000) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker* worker; gpr_mu_lock(shared->mu); GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(shared->pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + "pollset_work", grpc_pollset_work(&exec_ctx, shared->pollset, &worker, + GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(shared->mu); + grpc_exec_ctx_finish(&exec_ctx); } } -static void test_threading_wakeup(void* arg, grpc_error* error) { +static void test_threading_wakeup(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { threading_shared* shared = static_cast(arg); ++shared->wakeups; ++thread_wakeups; if (error == GRPC_ERROR_NONE) { GPR_ASSERT(GRPC_LOG_IF_ERROR( "consume_wakeup", grpc_wakeup_fd_consume_wakeup(shared->wakeup_fd))); - grpc_fd_notify_on_read(shared->wakeup_desc, &shared->on_wakeup); + grpc_fd_notify_on_read(exec_ctx, shared->wakeup_desc, &shared->on_wakeup); GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_next", grpc_wakeup_fd_wakeup(shared->wakeup_fd))); } @@ -271,12 +277,13 @@ static void test_threading(void) { shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup"); shared.wakeups = 0; { - grpc_core::ExecCtx exec_ctx; - grpc_pollset_add_fd(shared.pollset, shared.wakeup_desc); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_pollset_add_fd(&exec_ctx, shared.pollset, shared.wakeup_desc); grpc_fd_notify_on_read( - shared.wakeup_desc, + &exec_ctx, shared.wakeup_desc, GRPC_CLOSURE_INIT(&shared.on_wakeup, test_threading_wakeup, &shared, grpc_schedule_on_exec_ctx)); + grpc_exec_ctx_finish(&exec_ctx); } GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_first", grpc_wakeup_fd_wakeup(shared.wakeup_fd))); @@ -286,13 +293,14 @@ static void test_threading(void) { fd.read_fd = 0; grpc_wakeup_fd_destroy(&fd); { - grpc_core::ExecCtx exec_ctx; - grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED); - grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED); + grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, nullptr, nullptr, false /* already_closed */, "done"); - grpc_pollset_shutdown(shared.pollset, + grpc_pollset_shutdown(&exec_ctx, shared.pollset, GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset, grpc_schedule_on_exec_ctx)); + grpc_exec_ctx_finish(&exec_ctx); } gpr_free(shared.pollset); } @@ -301,21 +309,20 @@ int main(int argc, char** argv) { const char* poll_strategy = nullptr; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - - poll_strategy = grpc_get_poll_strategy_name(); - if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) { - test_add_fd_to_pollset(); - test_threading(); - } else { - gpr_log(GPR_INFO, - "Skipping the test. The test is only relevant for 'epollsig' " - "strategy. and the current strategy is: '%s'", - poll_strategy); - } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + poll_strategy = grpc_get_poll_strategy_name(); + if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) { + test_add_fd_to_pollset(); + test_threading(); + } else { + gpr_log(GPR_INFO, + "Skipping the test. The test is only relevant for 'epollsig' " + "strategy. and the current strategy is: '%s'", + poll_strategy); } + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/fd_conservation_posix_test.cc b/test/core/iomgr/fd_conservation_posix_test.cc index aaa14010f8..f46430c611 100644 --- a/test/core/iomgr/fd_conservation_posix_test.cc +++ b/test/core/iomgr/fd_conservation_posix_test.cc @@ -31,27 +31,26 @@ int main(int argc, char** argv) { grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - - /* set max # of file descriptors to a low value, and - verify we can create and destroy many more than this number - of descriptors */ - rlim.rlim_cur = rlim.rlim_max = 10; - GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); - grpc_resource_quota* resource_quota = - grpc_resource_quota_create("fd_conservation_posix_test"); - - for (i = 0; i < 100; i++) { - p = grpc_iomgr_create_endpoint_pair("test", NULL); - grpc_endpoint_destroy(p.client); - grpc_endpoint_destroy(p.server); - grpc_core::ExecCtx::Get()->Flush(); - } - - grpc_resource_quota_unref(resource_quota); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + /* set max # of file descriptors to a low value, and + verify we can create and destroy many more than this number + of descriptors */ + rlim.rlim_cur = rlim.rlim_max = 10; + GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); + grpc_resource_quota* resource_quota = + grpc_resource_quota_create("fd_conservation_posix_test"); + + for (i = 0; i < 100; i++) { + p = grpc_iomgr_create_endpoint_pair("test", nullptr); + grpc_endpoint_destroy(&exec_ctx, p.client); + grpc_endpoint_destroy(&exec_ctx, p.server); + grpc_exec_ctx_flush(&exec_ctx); } + grpc_resource_quota_unref(resource_quota); + + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc index cf75517538..a03d841ecd 100644 --- a/test/core/iomgr/fd_posix_test.cc +++ b/test/core/iomgr/fd_posix_test.cc @@ -111,19 +111,20 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ -static void session_shutdown_cb(void* arg, /*session */ +static void session_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ bool success) { session* se = static_cast(arg); server* sv = se->sv; - grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a"); + grpc_fd_orphan(exec_ctx, se->em_fd, nullptr, nullptr, + false /* already_closed */, "a"); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(sv->em_fd, + grpc_fd_shutdown(exec_ctx, sv->em_fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb")); } /* Called when data become readable in a session. */ -static void session_read_cb(void* arg, /*session */ +static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ grpc_error* error) { session* se = static_cast(arg); int fd = grpc_fd_wrapped_fd(se->em_fd); @@ -132,7 +133,7 @@ static void session_read_cb(void* arg, /*session */ ssize_t read_total = 0; if (error != GRPC_ERROR_NONE) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(exec_ctx, arg, 1); return; } @@ -147,7 +148,7 @@ static void session_read_cb(void* arg, /*session */ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(exec_ctx, arg, 1); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -158,7 +159,7 @@ static void session_read_cb(void* arg, /*session */ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -168,20 +169,22 @@ static void session_read_cb(void* arg, /*session */ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void* arg /*server */, int success) { +static void listen_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg /*server */, + int success) { server* sv = static_cast(arg); - grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b"); + grpc_fd_orphan(exec_ctx, sv->em_fd, nullptr, nullptr, + false /* already_closed */, "b"); gpr_mu_lock(g_mu); sv->done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } /* Called when a new TCP connection request arrives in the listening port. */ -static void listen_cb(void* arg, /*=sv_arg*/ +static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/ grpc_error* error) { server* sv = static_cast(arg); int fd; @@ -192,7 +195,7 @@ static void listen_cb(void* arg, /*=sv_arg*/ grpc_fd* listen_em_fd = sv->em_fd; if (error != GRPC_ERROR_NONE) { - listen_shutdown_cb(arg, 1); + listen_shutdown_cb(exec_ctx, arg, 1); return; } @@ -204,12 +207,12 @@ static void listen_cb(void* arg, /*=sv_arg*/ se = static_cast(gpr_malloc(sizeof(*se))); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); - grpc_pollset_add_fd(g_pollset, se->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd); GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); - grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure); } /* Max number of connections pending to be accepted by listen(). */ @@ -219,7 +222,7 @@ static void listen_cb(void* arg, /*=sv_arg*/ listen_cb() is registered to be interested in reading from listen_fd. When connection request arrives, listen_cb() is called to accept the connection request. */ -static int server_start(server* sv) { +static int server_start(grpc_exec_ctx* exec_ctx, server* sv) { int port = 0; int fd; struct sockaddr_in sin; @@ -233,11 +236,11 @@ static int server_start(server* sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); - grpc_pollset_add_fd(g_pollset, sv->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure); return port; } @@ -246,13 +249,13 @@ static int server_start(server* sv) { static void server_wait_and_shutdown(server* sv) { gpr_mu_lock(g_mu); while (!sv->done) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -286,16 +289,18 @@ static void client_init(client* cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void* arg /*client */, int success) { +static void client_session_shutdown_cb(grpc_exec_ctx* exec_ctx, + void* arg /*client */, int success) { client* cl = static_cast(arg); - grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c"); + grpc_fd_orphan(exec_ctx, cl->em_fd, nullptr, nullptr, + false /* already_closed */, "c"); cl->done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); } /* Write as much as possible, then register notify_on_write. */ -static void client_session_write(void* arg, /*client */ +static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */ grpc_error* error) { client* cl = static_cast(arg); int fd = grpc_fd_wrapped_fd(cl->em_fd); @@ -303,7 +308,7 @@ static void client_session_write(void* arg, /*client */ if (error != GRPC_ERROR_NONE) { gpr_mu_lock(g_mu); - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(exec_ctx, arg, 1); gpr_mu_unlock(g_mu); return; } @@ -318,10 +323,10 @@ static void client_session_write(void* arg, /*client */ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); + grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure); cl->client_write_cnt++; } else { - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(exec_ctx, arg, 1); } gpr_mu_unlock(g_mu); } else { @@ -331,7 +336,7 @@ static void client_session_write(void* arg, /*client */ } /* Start a client to send a stream of bytes. */ -static void client_start(client* cl, int port) { +static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) { int fd; struct sockaddr_in sin; create_test_socket(port, &fd, &sin); @@ -352,9 +357,9 @@ static void client_start(client* cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); - grpc_pollset_add_fd(g_pollset, cl->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd); - client_session_write(cl, GRPC_ERROR_NONE); + client_session_write(exec_ctx, cl, GRPC_ERROR_NONE); } /* Wait for the signal to shutdown a client. */ @@ -362,12 +367,12 @@ static void client_wait_and_shutdown(client* cl) { gpr_mu_lock(g_mu); while (!cl->done) { grpc_pollset_worker* worker = nullptr; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -380,13 +385,13 @@ static void test_grpc_fd(void) { server sv; client cl; int port; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; server_init(&sv); - port = server_start(&sv); + port = server_start(&exec_ctx, &sv); client_init(&cl); - client_start(&cl, port); - + client_start(&exec_ctx, &cl, port); + grpc_exec_ctx_finish(&exec_ctx); client_wait_and_shutdown(&cl); server_wait_and_shutdown(&sv); GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); @@ -401,25 +406,27 @@ void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; } void destroy_change_data(fd_change_data* fdc) {} -static void first_read_callback(void* arg /* fd_change_data */, +static void first_read_callback(grpc_exec_ctx* exec_ctx, + void* arg /* fd_change_data */, grpc_error* error) { fd_change_data* fdc = static_cast(arg); gpr_mu_lock(g_mu); fdc->cb_that_ran = first_read_callback; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } -static void second_read_callback(void* arg /* fd_change_data */, +static void second_read_callback(grpc_exec_ctx* exec_ctx, + void* arg /* fd_change_data */, grpc_error* error) { fd_change_data* fdc = static_cast(arg); gpr_mu_lock(g_mu); fdc->cb_that_ran = second_read_callback; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } @@ -436,7 +443,7 @@ static void test_grpc_fd_change(void) { ssize_t result; grpc_closure first_closure; grpc_closure second_closure; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a, grpc_schedule_on_exec_ctx); @@ -453,10 +460,10 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); - grpc_pollset_add_fd(g_pollset, em_fd); + grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, &first_closure); + grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -466,10 +473,10 @@ static void test_grpc_fd_change(void) { while (a.cb_that_ran == nullptr) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(a.cb_that_ran == first_read_callback); @@ -481,7 +488,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, &second_closure); + grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -490,43 +497,44 @@ static void test_grpc_fd_change(void) { while (b.cb_that_ran == nullptr) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(g_mu); - grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d"); - + grpc_fd_orphan(&exec_ctx, em_fd, nullptr, nullptr, false /* already_closed */, + "d"); + grpc_exec_ctx_finish(&exec_ctx); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - test_grpc_fd(); - test_grpc_fd_change(); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - grpc_core::ExecCtx::Get()->Flush(); - gpr_free(g_pollset); - } + g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + test_grpc_fd(); + test_grpc_fd_change(); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_flush(&exec_ctx); + gpr_free(g_pollset); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/load_file_test.cc b/test/core/iomgr/load_file_test.cc index 797d0ef1a4..9f360badcc 100644 --- a/test/core/iomgr/load_file_test.cc +++ b/test/core/iomgr/load_file_test.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -153,11 +152,9 @@ static void test_load_big_file(void) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); test_load_empty_file(); test_load_failure(); test_load_small_file(); test_load_big_file(); - grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc index f27079134b..719eab91fe 100644 --- a/test/core/iomgr/pollset_set_test.cc +++ b/test/core/iomgr/pollset_set_test.cc @@ -47,10 +47,11 @@ void init_test_pollset_sets(test_pollset_set* pollset_sets, const int num_pss) { } } -void cleanup_test_pollset_sets(test_pollset_set* pollset_sets, +void cleanup_test_pollset_sets(grpc_exec_ctx* exec_ctx, + test_pollset_set* pollset_sets, const int num_pss) { for (int i = 0; i < num_pss; i++) { - grpc_pollset_set_destroy(pollset_sets[i].pss); + grpc_pollset_set_destroy(exec_ctx, pollset_sets[i].pss); pollset_sets[i].pss = nullptr; } } @@ -72,19 +73,21 @@ static void init_test_pollsets(test_pollset* pollsets, const int num_pollsets) { } } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } -static void cleanup_test_pollsets(test_pollset* pollsets, +static void cleanup_test_pollsets(grpc_exec_ctx* exec_ctx, + test_pollset* pollsets, const int num_pollsets) { grpc_closure destroyed; for (int i = 0; i < num_pollsets; i++) { GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].ps, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(pollsets[i].ps, &destroyed); + grpc_pollset_shutdown(exec_ctx, pollsets[i].ps, &destroyed); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); gpr_free(pollsets[i].ps); pollsets[i].ps = nullptr; } @@ -102,43 +105,45 @@ typedef struct test_fd { grpc_closure on_readable; /* Closure to call when this fd is readable */ } test_fd; -void on_readable(void* tfd, grpc_error* error) { +void on_readable(grpc_exec_ctx* exec_ctx, void* tfd, grpc_error* error) { ((test_fd*)tfd)->is_on_readable_called = true; } -static void reset_test_fd(test_fd* tfd) { +static void reset_test_fd(grpc_exec_ctx* exec_ctx, test_fd* tfd) { tfd->is_on_readable_called = false; GRPC_CLOSURE_INIT(&tfd->on_readable, on_readable, tfd, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(tfd->fd, &tfd->on_readable); + grpc_fd_notify_on_read(exec_ctx, tfd->fd, &tfd->on_readable); } -static void init_test_fds(test_fd* tfds, const int num_fds) { +static void init_test_fds(grpc_exec_ctx* exec_ctx, test_fd* tfds, + const int num_fds) { for (int i = 0; i < num_fds; i++) { GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd)); tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd), "test_fd"); - reset_test_fd(&tfds[i]); + reset_test_fd(exec_ctx, &tfds[i]); } } -static void cleanup_test_fds(test_fd* tfds, const int num_fds) { +static void cleanup_test_fds(grpc_exec_ctx* exec_ctx, test_fd* tfds, + const int num_fds) { int release_fd; for (int i = 0; i < num_fds; i++) { - grpc_fd_shutdown(tfds[i].fd, + grpc_fd_shutdown(exec_ctx, tfds[i].fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("fd cleanup")); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(exec_ctx); /* grpc_fd_orphan frees the memory allocated for grpc_fd. Normally it also * calls close() on the underlying fd. In our case, we are using * grpc_wakeup_fd and we would like to destroy it ourselves (by calling * grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the * underlying fd, call it with a non-NULL 'release_fd' parameter */ - grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, - "test_fd_cleanup"); - grpc_core::ExecCtx::Get()->Flush(); + grpc_fd_orphan(exec_ctx, tfds[i].fd, nullptr, &release_fd, + false /* already_closed */, "test_fd_cleanup"); + grpc_exec_ctx_flush(exec_ctx); grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd); } @@ -150,7 +155,8 @@ static void make_test_fds_readable(test_fd* tfds, const int num_fds) { } } -static void verify_readable_and_reset(test_fd* tfds, const int num_fds) { +static void verify_readable_and_reset(grpc_exec_ctx* exec_ctx, test_fd* tfds, + const int num_fds) { for (int i = 0; i < num_fds; i++) { /* Verify that the on_readable callback was called */ GPR_ASSERT(tfds[i].is_on_readable_called); @@ -158,7 +164,7 @@ static void verify_readable_and_reset(test_fd* tfds, const int num_fds) { /* Reset the tfd[i] structure */ GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_consume_wakeup(&tfds[i].wakeup_fd)); - reset_test_fd(&tfds[i]); + reset_test_fd(exec_ctx, &tfds[i]); } } @@ -199,7 +205,7 @@ static void pollset_set_test_basic() { * | * +---> FD9 (Added after PS2 is added to PSS0) */ - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker* worker; grpc_millis deadline; @@ -210,33 +216,34 @@ static void pollset_set_test_basic() { const int num_ps = GPR_ARRAY_SIZE(pollsets); const int num_pss = GPR_ARRAY_SIZE(pollset_sets); - init_test_fds(tfds, num_fds); + init_test_fds(&exec_ctx, tfds, num_fds); init_test_pollsets(pollsets, num_ps); init_test_pollset_sets(pollset_sets, num_pss); /* Construct the pollset_set/pollset/fd tree (see diagram above) */ - grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd); - grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd); - grpc_pollset_add_fd(pollsets[0].ps, tfds[2].fd); - grpc_pollset_add_fd(pollsets[1].ps, tfds[3].fd); - grpc_pollset_add_fd(pollsets[2].ps, tfds[4].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[0].ps, tfds[2].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[3].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[2].ps, tfds[4].fd); - grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss); + grpc_pollset_set_add_pollset_set(&exec_ctx, pollset_sets[0].pss, + pollset_sets[1].pss); - grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollsets[0].ps); - grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[1].ps); - grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[2].ps); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[1].pss, pollsets[0].ps); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[1].ps); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[2].ps); - grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[5].fd); - grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[6].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[5].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[6].fd); - grpc_pollset_add_fd(pollsets[0].ps, tfds[7].fd); - grpc_pollset_add_fd(pollsets[1].ps, tfds[8].fd); - grpc_pollset_add_fd(pollsets[2].ps, tfds[9].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[0].ps, tfds[7].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[8].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[2].ps, tfds[9].fd); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); /* Test that if any FD in the above structure is readable, it is observable by * doing grpc_pollset_work on any pollset @@ -256,32 +263,34 @@ static void pollset_set_test_basic() { deadline = grpc_timespec_to_millis_round_up( grpc_timeout_milliseconds_to_deadline(2)); GPR_ASSERT(GRPC_ERROR_NONE == - grpc_pollset_work(pollsets[i].ps, &worker, deadline)); + grpc_pollset_work(&exec_ctx, pollsets[i].ps, &worker, deadline)); gpr_mu_unlock(pollsets[i].mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); - verify_readable_and_reset(tfds, num_fds); - grpc_core::ExecCtx::Get()->Flush(); + verify_readable_and_reset(&exec_ctx, tfds, num_fds); + grpc_exec_ctx_flush(&exec_ctx); } /* Test tear down */ - grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd); - grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[5].fd); - grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd); - grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[6].fd); - grpc_core::ExecCtx::Get()->Flush(); - - grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollsets[0].ps); - grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[1].ps); - grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[2].ps); - - grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss); - grpc_core::ExecCtx::Get()->Flush(); - - cleanup_test_fds(tfds, num_fds); - cleanup_test_pollsets(pollsets, num_ps); - cleanup_test_pollset_sets(pollset_sets, num_pss); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[5].fd); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[6].fd); + grpc_exec_ctx_flush(&exec_ctx); + + grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[1].pss, pollsets[0].ps); + grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[1].ps); + grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[2].ps); + + grpc_pollset_set_del_pollset_set(&exec_ctx, pollset_sets[0].pss, + pollset_sets[1].pss); + grpc_exec_ctx_flush(&exec_ctx); + + cleanup_test_fds(&exec_ctx, tfds, num_fds); + cleanup_test_pollsets(&exec_ctx, pollsets, num_ps); + cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss); + grpc_exec_ctx_finish(&exec_ctx); } /* Same FD added multiple times to the pollset_set tree */ @@ -301,7 +310,7 @@ void pollset_set_test_dup_fds() { * | +--> FD2 * +---> FD1 */ - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker* worker; grpc_millis deadline; @@ -312,20 +321,21 @@ void pollset_set_test_dup_fds() { const int num_ps = 1; const int num_pss = GPR_ARRAY_SIZE(pollset_sets); - init_test_fds(tfds, num_fds); + init_test_fds(&exec_ctx, tfds, num_fds); init_test_pollsets(&pollset, num_ps); init_test_pollset_sets(pollset_sets, num_pss); /* Construct the structure */ - grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd); - grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[0].fd); - grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[0].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd); - grpc_pollset_add_fd(pollset.ps, tfds[1].fd); - grpc_pollset_add_fd(pollset.ps, tfds[2].fd); + grpc_pollset_add_fd(&exec_ctx, pollset.ps, tfds[1].fd); + grpc_pollset_add_fd(&exec_ctx, pollset.ps, tfds[2].fd); - grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollset.ps); - grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[1].pss, pollset.ps); + grpc_pollset_set_add_pollset_set(&exec_ctx, pollset_sets[0].pss, + pollset_sets[1].pss); /* Test. Make all FDs readable and make sure that can be observed by doing a * grpc_pollset_work on the pollset 'PS' */ @@ -335,25 +345,27 @@ void pollset_set_test_dup_fds() { deadline = grpc_timespec_to_millis_round_up( grpc_timeout_milliseconds_to_deadline(2)); GPR_ASSERT(GRPC_ERROR_NONE == - grpc_pollset_work(pollset.ps, &worker, deadline)); + grpc_pollset_work(&exec_ctx, pollset.ps, &worker, deadline)); gpr_mu_unlock(pollset.mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); - verify_readable_and_reset(tfds, num_fds); - grpc_core::ExecCtx::Get()->Flush(); + verify_readable_and_reset(&exec_ctx, tfds, num_fds); + grpc_exec_ctx_flush(&exec_ctx); /* Tear down */ - grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd); - grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[0].fd); - grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd); - - grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollset.ps); - grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss); - grpc_core::ExecCtx::Get()->Flush(); - - cleanup_test_fds(tfds, num_fds); - cleanup_test_pollsets(&pollset, num_ps); - cleanup_test_pollset_sets(pollset_sets, num_pss); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[0].fd); + grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd); + + grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[1].pss, pollset.ps); + grpc_pollset_set_del_pollset_set(&exec_ctx, pollset_sets[0].pss, + pollset_sets[1].pss); + grpc_exec_ctx_flush(&exec_ctx); + + cleanup_test_fds(&exec_ctx, tfds, num_fds); + cleanup_test_pollsets(&exec_ctx, &pollset, num_ps); + cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss); + grpc_exec_ctx_finish(&exec_ctx); } /* Pollset_set with an empty pollset */ @@ -371,7 +383,7 @@ void pollset_set_test_empty_pollset() { * | * +---> FD2 */ - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker* worker; grpc_millis deadline; @@ -382,17 +394,17 @@ void pollset_set_test_empty_pollset() { const int num_ps = GPR_ARRAY_SIZE(pollsets); const int num_pss = 1; - init_test_fds(tfds, num_fds); + init_test_fds(&exec_ctx, tfds, num_fds); init_test_pollsets(pollsets, num_ps); init_test_pollset_sets(&pollset_set, num_pss); /* Construct the structure */ - grpc_pollset_set_add_fd(pollset_set.pss, tfds[0].fd); - grpc_pollset_add_fd(pollsets[1].ps, tfds[1].fd); - grpc_pollset_add_fd(pollsets[1].ps, tfds[2].fd); + grpc_pollset_set_add_fd(&exec_ctx, pollset_set.pss, tfds[0].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[1].fd); + grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[2].fd); - grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[0].ps); - grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[1].ps); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_set.pss, pollsets[0].ps); + grpc_pollset_set_add_pollset(&exec_ctx, pollset_set.pss, pollsets[1].ps); /* Test. Make all FDs readable and make sure that can be observed by doing * grpc_pollset_work on the empty pollset 'PS0' */ @@ -402,44 +414,45 @@ void pollset_set_test_empty_pollset() { deadline = grpc_timespec_to_millis_round_up( grpc_timeout_milliseconds_to_deadline(2)); GPR_ASSERT(GRPC_ERROR_NONE == - grpc_pollset_work(pollsets[0].ps, &worker, deadline)); + grpc_pollset_work(&exec_ctx, pollsets[0].ps, &worker, deadline)); gpr_mu_unlock(pollsets[0].mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); - verify_readable_and_reset(tfds, num_fds); - grpc_core::ExecCtx::Get()->Flush(); + verify_readable_and_reset(&exec_ctx, tfds, num_fds); + grpc_exec_ctx_flush(&exec_ctx); /* Tear down */ - grpc_pollset_set_del_fd(pollset_set.pss, tfds[0].fd); - grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[0].ps); - grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[1].ps); - grpc_core::ExecCtx::Get()->Flush(); - - cleanup_test_fds(tfds, num_fds); - cleanup_test_pollsets(pollsets, num_ps); - cleanup_test_pollset_sets(&pollset_set, num_pss); + grpc_pollset_set_del_fd(&exec_ctx, pollset_set.pss, tfds[0].fd); + grpc_pollset_set_del_pollset(&exec_ctx, pollset_set.pss, pollsets[0].ps); + grpc_pollset_set_del_pollset(&exec_ctx, pollset_set.pss, pollsets[1].ps); + grpc_exec_ctx_flush(&exec_ctx); + + cleanup_test_fds(&exec_ctx, tfds, num_fds); + cleanup_test_pollsets(&exec_ctx, pollsets, num_ps); + cleanup_test_pollset_sets(&exec_ctx, &pollset_set, num_pss); + grpc_exec_ctx_finish(&exec_ctx); } int main(int argc, char** argv) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - const char* poll_strategy = grpc_get_poll_strategy_name(); - - if (poll_strategy != nullptr && - (strcmp(poll_strategy, "epollsig") == 0 || - strcmp(poll_strategy, "epoll-threadpool") == 0)) { - pollset_set_test_basic(); - pollset_set_test_dup_fds(); - pollset_set_test_empty_pollset(); - } else { - gpr_log(GPR_INFO, - "Skipping the test. The test is only relevant for 'epoll' " - "strategy. and the current strategy is: '%s'", - poll_strategy); - } + const char* poll_strategy = grpc_get_poll_strategy_name(); + + if (poll_strategy != nullptr && + (strcmp(poll_strategy, "epollsig") == 0 || + strcmp(poll_strategy, "epoll-threadpool") == 0)) { + pollset_set_test_basic(); + pollset_set_test_dup_fds(); + pollset_set_test_empty_pollset(); + } else { + gpr_log(GPR_INFO, + "Skipping the test. The test is only relevant for 'epoll' " + "strategy. and the current strategy is: '%s'", + poll_strategy); } + + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc index e36315333c..836de423bd 100644 --- a/test/core/iomgr/resolve_address_posix_test.cc +++ b/test/core/iomgr/resolve_address_posix_test.cc @@ -46,29 +46,29 @@ typedef struct args_struct { grpc_pollset_set* pollset_set; } args_struct; -static void do_nothing(void* arg, grpc_error* error) {} +static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} -void args_init(args_struct* args) { +void args_init(grpc_exec_ctx* exec_ctx, args_struct* args) { gpr_event_init(&args->ev); args->pollset = static_cast(gpr_zalloc(grpc_pollset_size())); grpc_pollset_init(args->pollset, &args->mu); args->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(args->pollset_set, args->pollset); + grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset); args->addrs = nullptr; } -void args_finish(args_struct* args) { +void args_finish(grpc_exec_ctx* exec_ctx, args_struct* args) { GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline())); grpc_resolved_addresses_destroy(args->addrs); - grpc_pollset_set_del_pollset(args->pollset_set, args->pollset); - grpc_pollset_set_destroy(args->pollset_set); + grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); grpc_closure do_nothing_cb; GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(args->pollset, &do_nothing_cb); + grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb); // exec_ctx needs to be flushed before calling grpc_pollset_destroy() - grpc_core::ExecCtx::Get()->Flush(); - grpc_pollset_destroy(args->pollset); + grpc_exec_ctx_flush(exec_ctx); + grpc_pollset_destroy(exec_ctx, args->pollset); gpr_free(args->pollset); } @@ -79,24 +79,26 @@ static grpc_millis n_sec_deadline(int seconds) { static void actually_poll(void* argsp) { args_struct* args = static_cast(argsp); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_millis deadline = n_sec_deadline(10); while (true) { - grpc_core::ExecCtx exec_ctx; bool done = gpr_atm_acq_load(&args->done_atm) != 0; if (done) { break; } - grpc_millis time_left = deadline - grpc_core::ExecCtx::Get()->Now(); + grpc_millis time_left = deadline - grpc_exec_ctx_now(&exec_ctx); gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRIdPTR, done, time_left); GPR_ASSERT(time_left >= 0); grpc_pollset_worker* worker = nullptr; gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker, - n_sec_deadline(1))); + GRPC_LOG_IF_ERROR("pollset_work", + grpc_pollset_work(&exec_ctx, args->pollset, &worker, + n_sec_deadline(1))); gpr_mu_unlock(args->mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } gpr_event_set(&args->ev, (void*)1); + grpc_exec_ctx_finish(&exec_ctx); } static void poll_pollset_until_request_done(args_struct* args) { @@ -105,7 +107,8 @@ static void poll_pollset_until_request_done(args_struct* args) { gpr_thd_new(&id, "grpc_poll_pollset", actually_poll, args, nullptr); } -static void must_succeed(void* argsp, grpc_error* err) { +static void must_succeed(grpc_exec_ctx* exec_ctx, void* argsp, + grpc_error* err) { args_struct* args = static_cast(argsp); GPR_ASSERT(err == GRPC_ERROR_NONE); GPR_ASSERT(args->addrs != nullptr); @@ -113,28 +116,29 @@ static void must_succeed(void* argsp, grpc_error* err) { gpr_atm_rel_store(&args->done_atm, 1); } -static void must_fail(void* argsp, grpc_error* err) { +static void must_fail(grpc_exec_ctx* exec_ctx, void* argsp, grpc_error* err) { args_struct* args = static_cast(argsp); GPR_ASSERT(err != GRPC_ERROR_NONE); gpr_atm_rel_store(&args->done_atm, 1); } static void test_unix_socket(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); poll_pollset_until_request_done(&args); grpc_resolve_address( - "unix:/path/name", nullptr, args.pollset_set, + &exec_ctx, "unix:/path/name", nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_unix_socket_path_name_too_long(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); const char prefix[] = "unix:/path/name"; size_t path_name_length = GPR_ARRAY_SIZE(((struct sockaddr_un*)nullptr)->sun_path) + 6; @@ -146,23 +150,22 @@ static void test_unix_socket_path_name_too_long(void) { poll_pollset_until_request_done(&args); grpc_resolve_address( - path_name, nullptr, args.pollset_set, + &exec_ctx, path_name, nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); gpr_free(path_name); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } int main(int argc, char** argv) { grpc_test_init(argc, argv); grpc_init(); - - { - grpc_core::ExecCtx exec_ctx; - test_unix_socket(); - test_unix_socket_path_name_too_long(); - } - + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + test_unix_socket(); + test_unix_socket_path_name_too_long(); + grpc_executor_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index a0dc484f3e..1c5aa38a95 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -39,32 +39,32 @@ typedef struct args_struct { grpc_pollset_set* pollset_set; } args_struct; -static void do_nothing(void* arg, grpc_error* error) {} +static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} -void args_init(args_struct* args) { +void args_init(grpc_exec_ctx* exec_ctx, args_struct* args) { gpr_event_init(&args->ev); args->pollset = static_cast(gpr_zalloc(grpc_pollset_size())); grpc_pollset_init(args->pollset, &args->mu); args->pollset_set = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(args->pollset_set, args->pollset); + grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset); args->addrs = nullptr; gpr_atm_rel_store(&args->done_atm, 0); } -void args_finish(args_struct* args) { +void args_finish(grpc_exec_ctx* exec_ctx, args_struct* args) { GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline())); grpc_resolved_addresses_destroy(args->addrs); - grpc_pollset_set_del_pollset(args->pollset_set, args->pollset); - grpc_pollset_set_destroy(args->pollset_set); + grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); grpc_closure do_nothing_cb; GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr, grpc_schedule_on_exec_ctx); gpr_mu_lock(args->mu); - grpc_pollset_shutdown(args->pollset, &do_nothing_cb); + grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb); gpr_mu_unlock(args->mu); // exec_ctx needs to be flushed before calling grpc_pollset_destroy() - grpc_core::ExecCtx::Get()->Flush(); - grpc_pollset_destroy(args->pollset); + grpc_exec_ctx_flush(exec_ctx); + grpc_pollset_destroy(exec_ctx, args->pollset); gpr_free(args->pollset); } @@ -74,109 +74,119 @@ static grpc_millis n_sec_deadline(int seconds) { } static void poll_pollset_until_request_done(args_struct* args) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_millis deadline = n_sec_deadline(10); while (true) { bool done = gpr_atm_acq_load(&args->done_atm) != 0; if (done) { break; } - grpc_millis time_left = deadline - grpc_core::ExecCtx::Get()->Now(); + grpc_millis time_left = deadline - grpc_exec_ctx_now(&exec_ctx); gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRIdPTR, done, time_left); GPR_ASSERT(time_left >= 0); grpc_pollset_worker* worker = nullptr; gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker, - n_sec_deadline(1))); + GRPC_LOG_IF_ERROR("pollset_work", + grpc_pollset_work(&exec_ctx, args->pollset, &worker, + n_sec_deadline(1))); gpr_mu_unlock(args->mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); } gpr_event_set(&args->ev, (void*)1); + grpc_exec_ctx_finish(&exec_ctx); } -static void must_succeed(void* argsp, grpc_error* err) { +static void must_succeed(grpc_exec_ctx* exec_ctx, void* argsp, + grpc_error* err) { args_struct* args = static_cast(argsp); GPR_ASSERT(err == GRPC_ERROR_NONE); GPR_ASSERT(args->addrs != nullptr); GPR_ASSERT(args->addrs->naddrs > 0); gpr_atm_rel_store(&args->done_atm, 1); gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, args->pollset, nullptr)); gpr_mu_unlock(args->mu); } -static void must_fail(void* argsp, grpc_error* err) { +static void must_fail(grpc_exec_ctx* exec_ctx, void* argsp, grpc_error* err) { args_struct* args = static_cast(argsp); GPR_ASSERT(err != GRPC_ERROR_NONE); gpr_atm_rel_store(&args->done_atm, 1); gpr_mu_lock(args->mu); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); + GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, args->pollset, nullptr)); gpr_mu_unlock(args->mu); } static void test_localhost(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - "localhost:1", nullptr, args.pollset_set, + &exec_ctx, "localhost:1", nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_default_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - "localhost", "1", args.pollset_set, + &exec_ctx, "localhost", "1", args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_non_numeric_default_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - "localhost", "https", args.pollset_set, + &exec_ctx, "localhost", "https", args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_missing_default_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - "localhost", nullptr, args.pollset_set, + &exec_ctx, "localhost", nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_ipv6_with_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - "[2001:db8::1]:1", nullptr, args.pollset_set, + &exec_ctx, "[2001:db8::1]:1", nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } static void test_ipv6_without_port(void) { @@ -187,16 +197,17 @@ static void test_ipv6_without_port(void) { }; unsigned i; for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - kCases[i], "80", args.pollset_set, + &exec_ctx, kCases[i], "80", args.pollset_set, GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } } @@ -207,16 +218,17 @@ static void test_invalid_ip_addresses(void) { }; unsigned i; for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - kCases[i], nullptr, args.pollset_set, + &exec_ctx, kCases[i], nullptr, args.pollset_set, GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } } @@ -226,35 +238,34 @@ static void test_unparseable_hostports(void) { }; unsigned i; for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; args_struct args; - args_init(&args); + args_init(&exec_ctx, &args); grpc_resolve_address( - kCases[i], "1", args.pollset_set, + &exec_ctx, kCases[i], "1", args.pollset_set, GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx), &args.addrs); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); poll_pollset_until_request_done(&args); - args_finish(&args); + args_finish(&exec_ctx, &args); + grpc_exec_ctx_finish(&exec_ctx); } } int main(int argc, char** argv) { grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - test_localhost(); - test_default_port(); - test_non_numeric_default_port(); - test_missing_default_port(); - test_ipv6_with_port(); - test_ipv6_without_port(); - test_invalid_ip_addresses(); - test_unparseable_hostports(); - grpc_executor_shutdown(); - } - + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + test_localhost(); + test_default_port(); + test_non_numeric_default_port(); + test_missing_default_port(); + test_ipv6_with_port(); + test_ipv6_without_port(); + test_invalid_ip_addresses(); + test_unparseable_hostports(); + grpc_executor_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/resource_quota_test.cc b/test/core/iomgr/resource_quota_test.cc index ae26f72701..6851702e67 100644 --- a/test/core/iomgr/resource_quota_test.cc +++ b/test/core/iomgr/resource_quota_test.cc @@ -27,7 +27,7 @@ gpr_mu g_mu; gpr_cv g_cv; -static void inc_int_cb(void* a, grpc_error* error) { +static void inc_int_cb(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) { gpr_mu_lock(&g_mu); ++*(int*)a; gpr_cv_signal(&g_cv); @@ -43,7 +43,7 @@ static void assert_counter_becomes(int* ctr, int value) { gpr_mu_unlock(&g_mu); } -static void set_event_cb(void* a, grpc_error* error) { +static void set_event_cb(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) { gpr_event_set((gpr_event*)a, (void*)1); } grpc_closure* set_event(gpr_event* ev) { @@ -56,12 +56,13 @@ typedef struct { grpc_closure* then; } reclaimer_args; -static void reclaimer_cb(void* args, grpc_error* error) { +static void reclaimer_cb(grpc_exec_ctx* exec_ctx, void* args, + grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); reclaimer_args* a = static_cast(args); - grpc_resource_user_free(a->resource_user, a->size); - grpc_resource_user_finish_reclamation(a->resource_user); - GRPC_CLOSURE_RUN(a->then, GRPC_ERROR_NONE); + grpc_resource_user_free(exec_ctx, a->resource_user, a->size); + grpc_resource_user_finish_reclamation(exec_ctx, a->resource_user); + GRPC_CLOSURE_RUN(exec_ctx, a->then, GRPC_ERROR_NONE); gpr_free(a); } @@ -74,9 +75,10 @@ grpc_closure* make_reclaimer(grpc_resource_user* resource_user, size_t size, return GRPC_CLOSURE_CREATE(reclaimer_cb, a, grpc_schedule_on_exec_ctx); } -static void unused_reclaimer_cb(void* arg, grpc_error* error) { +static void unused_reclaimer_cb(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_RUN(static_cast(arg), GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(exec_ctx, static_cast(arg), GRPC_ERROR_NONE); } grpc_closure* make_unused_reclaimer(grpc_closure* then) { return GRPC_CLOSURE_CREATE(unused_reclaimer_cb, then, @@ -84,8 +86,9 @@ grpc_closure* make_unused_reclaimer(grpc_closure* then) { } static void destroy_user(grpc_resource_user* usr) { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_unref(usr); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_unref(&exec_ctx, usr); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op(void) { @@ -117,12 +120,14 @@ static void test_instant_alloc_then_free(void) { grpc_resource_quota_resize(q, 1024 * 1024); grpc_resource_user* usr = grpc_resource_user_create(q, "usr"); { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -135,9 +140,10 @@ static void test_instant_alloc_free_pair(void) { grpc_resource_quota_resize(q, 1024 * 1024); grpc_resource_user* usr = grpc_resource_user_create(q, "usr"); { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, NULL); - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr); + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -152,15 +158,16 @@ static void test_simple_async_alloc(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -175,9 +182,9 @@ static void test_async_alloc_blocked_by_size(void) { gpr_event ev; gpr_event_init(&ev); { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait( &ev, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); } @@ -186,8 +193,9 @@ static void test_async_alloc_blocked_by_size(void) { nullptr); ; { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -202,30 +210,32 @@ static void test_scavenge(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr1, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr1, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr1, 1024); + grpc_exec_ctx_finish(&exec_ctx); } { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr2, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr2, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr2, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr1); @@ -241,32 +251,33 @@ static void test_scavenge_blocked(void) { gpr_event ev; { gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr1, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr2, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait( &ev, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr1, 1024); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr1, 1024); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr2, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr2, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr1); @@ -282,9 +293,9 @@ static void test_blocked_until_scheduled_reclaim(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; @@ -292,16 +303,18 @@ static void test_blocked_until_scheduled_reclaim(void) { gpr_event reclaim_done; gpr_event_init(&reclaim_done); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_reclaimer(usr, 1024, set_event(&reclaim_done))); + &exec_ctx, usr, false, + make_reclaimer(usr, 1024, set_event(&reclaim_done))); + grpc_exec_ctx_finish(&exec_ctx); } { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaim_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != @@ -309,8 +322,9 @@ static void test_blocked_until_scheduled_reclaim(void) { ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -326,9 +340,9 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr1, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; @@ -336,16 +350,18 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) { gpr_event reclaim_done; gpr_event_init(&reclaim_done); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr1, false, make_reclaimer(usr1, 1024, set_event(&reclaim_done))); + &exec_ctx, usr1, false, + make_reclaimer(usr1, 1024, set_event(&reclaim_done))); + grpc_exec_ctx_finish(&exec_ctx); } { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr2, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaim_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != @@ -353,8 +369,9 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) { ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr2, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr2, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr1); @@ -370,9 +387,9 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; @@ -380,16 +397,18 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) { gpr_event reclaim_done; gpr_event_init(&reclaim_done); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, true, make_reclaimer(usr, 1024, set_event(&reclaim_done))); + &exec_ctx, usr, true, + make_reclaimer(usr, 1024, set_event(&reclaim_done))); + grpc_exec_ctx_finish(&exec_ctx); } { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaim_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != @@ -397,8 +416,9 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) { ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -415,12 +435,13 @@ static void test_unused_reclaim_is_cancelled(void) { gpr_event destructive_done; gpr_event_init(&destructive_done); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_unused_reclaimer(set_event(&benign_done))); + &exec_ctx, usr, false, make_unused_reclaimer(set_event(&benign_done))); grpc_resource_user_post_reclaimer( - usr, true, make_unused_reclaimer(set_event(&destructive_done))); - grpc_core::ExecCtx::Get()->Flush(); + &exec_ctx, usr, true, + make_unused_reclaimer(set_event(&destructive_done))); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&benign_done, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); @@ -449,20 +470,22 @@ static void test_benign_reclaim_is_preferred(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_reclaimer(usr, 1024, set_event(&benign_done))); + &exec_ctx, usr, false, + make_reclaimer(usr, 1024, set_event(&benign_done))); grpc_resource_user_post_reclaimer( - usr, true, make_unused_reclaimer(set_event(&destructive_done))); - grpc_core::ExecCtx::Get()->Flush(); + &exec_ctx, usr, true, + make_unused_reclaimer(set_event(&destructive_done))); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&benign_done, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); @@ -473,9 +496,9 @@ static void test_benign_reclaim_is_preferred(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&benign_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); GPR_ASSERT(gpr_event_wait(&destructive_done, @@ -485,8 +508,9 @@ static void test_benign_reclaim_is_preferred(void) { nullptr); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -509,20 +533,22 @@ static void test_multiple_reclaims_can_be_triggered(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) != nullptr); ; } { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_reclaimer(usr, 512, set_event(&benign_done))); + &exec_ctx, usr, false, + make_reclaimer(usr, 512, set_event(&benign_done))); grpc_resource_user_post_reclaimer( - usr, true, make_reclaimer(usr, 512, set_event(&destructive_done))); - grpc_core::ExecCtx::Get()->Flush(); + &exec_ctx, usr, true, + make_reclaimer(usr, 512, set_event(&destructive_done))); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&benign_done, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); @@ -533,9 +559,9 @@ static void test_multiple_reclaims_can_be_triggered(void) { { gpr_event ev; gpr_event_init(&ev); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&ev)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&benign_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); GPR_ASSERT(gpr_event_wait(&destructive_done, @@ -545,8 +571,9 @@ static void test_multiple_reclaims_can_be_triggered(void) { ; } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); destroy_user(usr); @@ -564,17 +591,20 @@ static void test_resource_user_stays_allocated_until_memory_released(void) { grpc_resource_quota_resize(q, 1024 * 1024); grpc_resource_user* usr = grpc_resource_user_create(q, "usr"); { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_quota_unref(q); - grpc_resource_user_unref(usr); + grpc_resource_user_unref(&exec_ctx, usr); + grpc_exec_ctx_finish(&exec_ctx); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } } @@ -594,10 +624,11 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released( gpr_event reclaimer_cancelled; gpr_event_init(&reclaimer_cancelled); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_unused_reclaimer(set_event(&reclaimer_cancelled))); - grpc_core::ExecCtx::Get()->Flush(); + &exec_ctx, usr, false, + make_unused_reclaimer(set_event(&reclaimer_cancelled))); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); @@ -605,27 +636,27 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released( { gpr_event allocated; gpr_event_init(&allocated); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&allocated)); - grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&allocated, - grpc_timeout_seconds_to_deadline(5)) != NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(gpr_event_wait(&allocated, grpc_timeout_seconds_to_deadline( + 5)) != nullptr); GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_unref(usr); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_unref(&exec_ctx, usr); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled, grpc_timeout_seconds_to_deadline(5)) != nullptr); @@ -643,9 +674,9 @@ static void test_reclaimers_can_be_posted_repeatedly(void) { { gpr_event allocated; gpr_event_init(&allocated); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&allocated)); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&allocated, grpc_timeout_seconds_to_deadline(5)) != nullptr); } @@ -653,10 +684,11 @@ static void test_reclaimers_can_be_posted_repeatedly(void) { gpr_event reclaimer_done; gpr_event_init(&reclaimer_done); { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_post_reclaimer( - usr, false, make_reclaimer(usr, 1024, set_event(&reclaimer_done))); - grpc_core::ExecCtx::Get()->Flush(); + &exec_ctx, usr, false, + make_reclaimer(usr, 1024, set_event(&reclaimer_done))); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(gpr_event_wait(&reclaimer_done, grpc_timeout_milliseconds_to_deadline(100)) == nullptr); @@ -664,19 +696,20 @@ static void test_reclaimers_can_be_posted_repeatedly(void) { { gpr_event allocated; gpr_event_init(&allocated); - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc(usr, 1024, set_event(&allocated)); - grpc_core::ExecCtx::Get()->Flush(); - GPR_ASSERT(gpr_event_wait(&allocated, - grpc_timeout_seconds_to_deadline(5)) != NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(gpr_event_wait(&allocated, grpc_timeout_seconds_to_deadline( + 5)) != nullptr); GPR_ASSERT(gpr_event_wait(&reclaimer_done, grpc_timeout_seconds_to_deadline(5)) != nullptr); } } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_free(usr, 1024); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_free(&exec_ctx, usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); } destroy_user(usr); grpc_resource_quota_unref(q); @@ -699,15 +732,16 @@ static void test_one_slice(void) { { const int start_allocs = num_allocs; - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer); + grpc_exec_ctx_finish(&exec_ctx); assert_counter_becomes(&num_allocs, start_allocs + 1); } { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer_destroy_internal(&buffer); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); } destroy_user(usr); grpc_resource_quota_unref(q); @@ -731,21 +765,23 @@ static void test_one_slice_deleted_late(void) { { const int start_allocs = num_allocs; - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer); + grpc_exec_ctx_finish(&exec_ctx); assert_counter_becomes(&num_allocs, start_allocs + 1); } { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_unref(usr); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_unref(&exec_ctx, usr); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer_destroy_internal(&buffer); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); } } @@ -773,9 +809,9 @@ static void test_negative_rq_free_pool(void) { { const int start_allocs = num_allocs; - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer); + grpc_exec_ctx_finish(&exec_ctx); assert_counter_becomes(&num_allocs, start_allocs + 1); } @@ -786,14 +822,16 @@ static void test_negative_rq_free_pool(void) { GPR_ASSERT(grpc_resource_quota_get_memory_pressure(q) > 1 - eps); { - grpc_core::ExecCtx exec_ctx; - grpc_resource_user_unref(usr); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_unref(&exec_ctx, usr); + grpc_exec_ctx_finish(&exec_ctx); } grpc_resource_quota_unref(q); { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer_destroy_internal(&buffer); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer); + grpc_exec_ctx_finish(&exec_ctx); } } diff --git a/test/core/iomgr/tcp_client_posix_test.cc b/test/core/iomgr/tcp_client_posix_test.cc index 40a050ed9f..9fb1a2d770 100644 --- a/test/core/iomgr/tcp_client_posix_test.cc +++ b/test/core/iomgr/tcp_client_posix_test.cc @@ -53,24 +53,26 @@ static grpc_millis test_deadline(void) { static void finish_connection() { gpr_mu_lock(g_mu); g_connections_complete++; - grpc_core::ExecCtx exec_ctx; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); - + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(&exec_ctx, g_pollset, nullptr))); + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_unlock(g_mu); } -static void must_succeed(void* arg, grpc_error* error) { +static void must_succeed(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { GPR_ASSERT(g_connecting != nullptr); GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "must_succeed called")); - grpc_endpoint_destroy(g_connecting); + grpc_endpoint_shutdown( + exec_ctx, g_connecting, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("must_succeed called")); + grpc_endpoint_destroy(exec_ctx, g_connecting); g_connecting = nullptr; finish_connection(); } -static void must_fail(void* arg, grpc_error* error) { +static void must_fail(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { GPR_ASSERT(g_connecting == nullptr); GPR_ASSERT(error != GRPC_ERROR_NONE); finish_connection(); @@ -83,7 +85,7 @@ void test_succeeds(void) { int r; int connections_complete_before; grpc_closure done; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_succeeds"); @@ -106,8 +108,8 @@ void test_succeeds(void) { GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr, (socklen_t*)&resolved_addr.len) == 0); GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx); - grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr, - &resolved_addr, GRPC_MILLIS_INF_FUTURE); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, + nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE); /* await the connection */ do { @@ -123,15 +125,17 @@ void test_succeeds(void) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, grpc_timespec_to_millis_round_up( grpc_timeout_seconds_to_deadline(5))))); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); + + grpc_exec_ctx_finish(&exec_ctx); } void test_fails(void) { @@ -139,7 +143,7 @@ void test_fails(void) { struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; int connections_complete_before; grpc_closure done; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_fails"); @@ -153,8 +157,8 @@ void test_fails(void) { /* connect to a broken address */ GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx); - grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr, - &resolved_addr, GRPC_MILLIS_INF_FUTURE); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, + nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE); gpr_mu_lock(g_mu); @@ -162,7 +166,7 @@ void test_fails(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker* worker = nullptr; grpc_millis polling_deadline = test_deadline(); - switch (grpc_timer_check(&polling_deadline)) { + switch (grpc_timer_check(&exec_ctx, &polling_deadline)) { case GRPC_TIMERS_FIRED: break; case GRPC_TIMERS_NOT_CHECKED: @@ -170,43 +174,42 @@ void test_fails(void) { /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, polling_deadline))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + polling_deadline))); break; } gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - - { - grpc_core::ExecCtx exec_ctx; - g_pollset_set = grpc_pollset_set_create(); - g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - grpc_pollset_set_add_pollset(g_pollset_set, g_pollset); - - test_succeeds(); - gpr_log(GPR_ERROR, "End of first test"); - test_fails(); - grpc_pollset_set_destroy(g_pollset_set); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - } - + g_pollset_set = grpc_pollset_set_create(); + g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_set_add_pollset(&exec_ctx, g_pollset_set, g_pollset); + grpc_exec_ctx_finish(&exec_ctx); + test_succeeds(); + gpr_log(GPR_ERROR, "End of first test"); + test_fails(); + grpc_pollset_set_destroy(&exec_ctx, g_pollset_set); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); return 0; diff --git a/test/core/iomgr/tcp_client_uv_test.cc b/test/core/iomgr/tcp_client_uv_test.cc index 0c6250ed7f..101d7bf6b5 100644 --- a/test/core/iomgr/tcp_client_uv_test.cc +++ b/test/core/iomgr/tcp_client_uv_test.cc @@ -46,28 +46,30 @@ static grpc_millis test_deadline(void) { return grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); } -static void finish_connection() { +static void finish_connection(grpc_exec_ctx* exec_ctx) { gpr_mu_lock(g_mu); g_connections_complete++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, g_pollset, NULL))); gpr_mu_unlock(g_mu); } -static void must_succeed(void* arg, grpc_error* error) { +static void must_succeed(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { GPR_ASSERT(g_connecting != NULL); GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "must_succeed called")); - grpc_endpoint_destroy(g_connecting); + grpc_endpoint_shutdown( + exec_ctx, g_connecting, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("must_succeed called")); + grpc_endpoint_destroy(exec_ctx, g_connecting); g_connecting = NULL; - finish_connection(); + finish_connection(exec_ctx); } -static void must_fail(void* arg, grpc_error* error) { +static void must_fail(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { GPR_ASSERT(g_connecting == NULL); GPR_ASSERT(error != GRPC_ERROR_NONE); - finish_connection(); + finish_connection(exec_ctx); } static void close_cb(uv_handle_t* handle) { gpr_free(handle); } @@ -87,7 +89,7 @@ void test_succeeds(void) { uv_tcp_t* svr_handle = static_cast(gpr_malloc(sizeof(uv_tcp_t))); int connections_complete_before; grpc_closure done; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_succeeds"); @@ -108,8 +110,8 @@ void test_succeeds(void) { GPR_ASSERT(uv_tcp_getsockname(svr_handle, (struct sockaddr*)addr, (int*)&resolved_addr.len) == 0); GRPC_CLOSURE_INIT(&done, must_succeed, NULL, grpc_schedule_on_exec_ctx); - grpc_tcp_client_connect(&done, &g_connecting, NULL, NULL, &resolved_addr, - GRPC_MILLIS_INF_FUTURE); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL, + &resolved_addr, GRPC_MILLIS_INF_FUTURE); gpr_mu_lock(g_mu); @@ -117,11 +119,11 @@ void test_succeeds(void) { grpc_pollset_worker* worker = NULL; GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, grpc_timespec_to_millis_round_up( grpc_timeout_seconds_to_deadline(5))))); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } @@ -129,6 +131,8 @@ void test_succeeds(void) { uv_close((uv_handle_t*)svr_handle, close_cb); gpr_mu_unlock(g_mu); + + grpc_exec_ctx_finish(&exec_ctx); } void test_fails(void) { @@ -136,7 +140,7 @@ void test_fails(void) { struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; int connections_complete_before; grpc_closure done; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_fails"); @@ -150,8 +154,8 @@ void test_fails(void) { /* connect to a broken address */ GRPC_CLOSURE_INIT(&done, must_fail, NULL, grpc_schedule_on_exec_ctx); - grpc_tcp_client_connect(&done, &g_connecting, NULL, NULL, &resolved_addr, - GRPC_MILLIS_INF_FUTURE); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL, + &resolved_addr, GRPC_MILLIS_INF_FUTURE); gpr_mu_lock(g_mu); @@ -160,7 +164,7 @@ void test_fails(void) { grpc_pollset_worker* worker = NULL; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); grpc_millis polling_deadline = test_deadline(); - switch (grpc_timer_check(&polling_deadline)) { + switch (grpc_timer_check(&exec_ctx, &polling_deadline)) { case GRPC_TIMERS_FIRED: break; case GRPC_TIMERS_NOT_CHECKED: @@ -168,37 +172,39 @@ void test_fails(void) { /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(g_pollset, &worker, polling_deadline))); + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, + polling_deadline))); break; } gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); g_pollset = static_cast(gpr_malloc(grpc_pollset_size())); grpc_pollset_init(g_pollset, &g_mu); - + grpc_exec_ctx_finish(&exec_ctx); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); return 0; diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index f4acba8302..7986dc2b19 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -131,7 +131,8 @@ static size_t count_slices(grpc_slice* slices, size_t nslices, return num_bytes; } -static void read_cb(void* user_data, grpc_error* error) { +static void read_cb(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { struct read_socket_state* state = (struct read_socket_state*)user_data; size_t read_bytes; int current_data; @@ -146,11 +147,11 @@ static void read_cb(void* user_data, grpc_error* error) { gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - GPR_ASSERT( - GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); gpr_mu_unlock(g_mu); } } @@ -163,7 +164,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { size_t written_bytes; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR, num_bytes, slice_size); @@ -174,8 +175,9 @@ static void read_test(size_t num_bytes, size_t slice_size) { a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -186,22 +188,24 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } /* Write to a socket until it fills up, then read from it using the grpc_tcp @@ -213,7 +217,7 @@ static void large_read_test(size_t slice_size) { ssize_t written_bytes; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size); @@ -224,8 +228,9 @@ static void large_read_test(size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"), + &args, "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -236,22 +241,24 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } struct write_socket_state { @@ -282,15 +289,16 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, return slices; } -static void write_done(void* user_data /* write_socket_state */, +static void write_done(grpc_exec_ctx* exec_ctx, + void* user_data /* write_socket_state */, grpc_error* error) { struct write_socket_state* state = (struct write_socket_state*)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(g_mu); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } @@ -301,7 +309,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { int flags; int current = 0; int i; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); @@ -311,11 +319,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_mu_lock(g_mu); GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, grpc_timespec_to_millis_round_up( grpc_timeout_milliseconds_to_deadline(10))))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = read(fd, buf, bytes_left > read_size ? read_size : bytes_left); @@ -348,7 +356,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_closure write_done_closure; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR, @@ -360,8 +368,9 @@ static void write_test(size_t num_bytes, size_t slice_size) { a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args, + "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; state.write_done = 0; @@ -373,7 +382,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(ep, &outgoing, &write_done_closure); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(g_mu); for (;;) { @@ -382,23 +391,25 @@ static void write_test(size_t num_bytes, size_t slice_size) { break; } GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&outgoing); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &outgoing); + grpc_endpoint_destroy(&exec_ctx, ep); gpr_free(slices); + grpc_exec_ctx_finish(&exec_ctx); } -void on_fd_released(void* arg, grpc_error* errors) { +void on_fd_released(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* errors) { int* done = (int*)arg; *done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -411,7 +422,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { int fd; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure fd_released_cb; int fd_released_done = 0; GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done, @@ -428,9 +439,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_endpoint_add_to_pollset(ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -441,35 +453,38 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, state.read_bytes, state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb); - grpc_core::ExecCtx::Get()->Flush(); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); } gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); GPR_ASSERT(fd == sv[1]); + grpc_exec_ctx_finish(&exec_ctx); written_bytes = fill_socket_partial(sv[0], num_bytes); drain_socket_blocking(fd, written_bytes, written_bytes); @@ -507,7 +522,7 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( size_t slice_size) { int sv[2]; grpc_endpoint_test_fixture f; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; create_sockets(sv); grpc_resource_quota* resource_quota = @@ -517,13 +532,15 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - f.client_ep = - grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test"); - f.server_ep = - grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test"); - grpc_resource_quota_unref_internal(resource_quota); - grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, g_pollset); + f.client_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test"); + f.server_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test"); + grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); + + grpc_exec_ctx_finish(&exec_ctx); return f; } @@ -532,26 +549,24 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy((grpc_pollset*)p); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); - run_tests(); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - - grpc_core::ExecCtx::Get()->Flush(); - } + g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + run_tests(); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); diff --git a/test/core/iomgr/tcp_server_posix_test.cc b/test/core/iomgr/tcp_server_posix_test.cc index 3c9ca2109e..48d8d425a5 100644 --- a/test/core/iomgr/tcp_server_posix_test.cc +++ b/test/core/iomgr/tcp_server_posix_test.cc @@ -110,7 +110,8 @@ static void on_connect_result_set(on_connect_result* result, result->server, acceptor->port_index, acceptor->fd_index); } -static void server_weak_ref_shutdown(void* arg, grpc_error* error) { +static void server_weak_ref_shutdown(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { server_weak_ref* weak_ref = static_cast(arg); weak_ref->server = nullptr; } @@ -144,11 +145,12 @@ static void test_addr_init_str(test_addr* addr) { } } -static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset, +static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, + grpc_pollset* pollset, grpc_tcp_server_acceptor* acceptor) { - grpc_endpoint_shutdown(tcp, + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); - grpc_endpoint_destroy(tcp); + grpc_endpoint_destroy(exec_ctx, tcp); on_connect_result temp_result; on_connect_result_set(&temp_result, acceptor); @@ -157,33 +159,38 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset, gpr_mu_lock(g_mu); g_result = temp_result; g_nconnects++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } static void test_no_op(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); - grpc_tcp_server_unref(s); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s)); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s)); LOG_TEST("test_no_op_with_start"); - grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr); - grpc_tcp_server_unref(s); + grpc_tcp_server_start(&exec_ctx, s, nullptr, 0, on_connect, nullptr); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s)); LOG_TEST("test_no_op_with_port"); memset(&resolved_addr, 0, sizeof(resolved_addr)); @@ -194,15 +201,17 @@ static void test_no_op_with_port(void) { GRPC_ERROR_NONE && port > 0); - grpc_tcp_server_unref(s); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port_and_start(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s)); LOG_TEST("test_no_op_with_port_and_start"); int port = -1; @@ -213,12 +222,13 @@ static void test_no_op_with_port_and_start(void) { GRPC_ERROR_NONE && port > 0); - grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr); + grpc_tcp_server_start(&exec_ctx, s, nullptr, 0, on_connect, nullptr); - grpc_tcp_server_unref(s); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } -static grpc_error* tcp_connect(const test_addr* remote, +static grpc_error* tcp_connect(grpc_exec_ctx* exec_ctx, const test_addr* remote, on_connect_result* result) { grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); @@ -244,17 +254,17 @@ static grpc_error* tcp_connect(const test_addr* remote, } gpr_log(GPR_DEBUG, "wait"); while (g_nconnects == nconnects_before && - deadline > grpc_core::ExecCtx::Get()->Now()) { + deadline > grpc_exec_ctx_now(exec_ctx)) { grpc_pollset_worker* worker = nullptr; grpc_error* err; - if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) != + if ((err = grpc_pollset_work(exec_ctx, g_pollset, &worker, deadline)) != GRPC_ERROR_NONE) { gpr_mu_unlock(g_mu); close(clifd); return err; } gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(exec_ctx); gpr_mu_lock(g_mu); } gpr_log(GPR_DEBUG, "wait done"); @@ -269,7 +279,7 @@ static grpc_error* tcp_connect(const test_addr* remote, gpr_mu_unlock(g_mu); gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index, result->fd_index, result->server_fd); - grpc_tcp_server_unref(result->server); + grpc_tcp_server_unref(exec_ctx, result->server); return GRPC_ERROR_NONE; } @@ -282,7 +292,7 @@ static grpc_error* tcp_connect(const test_addr* remote, static void test_connect(size_t num_connects, const grpc_channel_args* channel_args, test_addrs* dst_addrs, bool test_dst_addrs) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; grpc_resolved_address resolved_addr1; struct sockaddr_storage* const addr = @@ -297,7 +307,7 @@ static void test_connect(size_t num_connects, grpc_tcp_server* s; const unsigned num_ports = 2; GPR_ASSERT(GRPC_ERROR_NONE == - grpc_tcp_server_create(nullptr, channel_args, &s)); + grpc_tcp_server_create(&exec_ctx, nullptr, channel_args, &s)); unsigned port_num; server_weak_ref weak_ref; server_weak_ref_init(&weak_ref); @@ -342,7 +352,7 @@ static void test_connect(size_t num_connects, svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1); GPR_ASSERT(svr1_fd_count >= 1); - grpc_tcp_server_start(s, &g_pollset, 1, on_connect, nullptr); + grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, nullptr); if (dst_addrs != nullptr) { int ports[] = {svr_port, svr1_port}; @@ -362,7 +372,7 @@ static void test_connect(size_t num_connects, test_addr_init_str(&dst); ++num_tested; on_connect_result_init(&result); - if ((err = tcp_connect(&dst, &result)) == GRPC_ERROR_NONE && + if ((err = tcp_connect(&exec_ctx, &dst, &result)) == GRPC_ERROR_NONE && result.server_fd >= 0 && result.server == s) { continue; } @@ -393,8 +403,8 @@ static void test_connect(size_t num_connects, for (connect_num = 0; connect_num < num_connects; ++connect_num) { on_connect_result result; on_connect_result_init(&result); - GPR_ASSERT( - GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result))); + GPR_ASSERT(GRPC_LOG_IF_ERROR("tcp_connect", + tcp_connect(&exec_ctx, &dst, &result))); GPR_ASSERT(result.server_fd == fd); GPR_ASSERT(result.port_index == port_num); GPR_ASSERT(result.fd_index == fd_num); @@ -410,19 +420,21 @@ static void test_connect(size_t num_connects, GPR_ASSERT(weak_ref.server != nullptr); GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0); - grpc_tcp_server_unref(s); - grpc_core::ExecCtx::Get()->Flush(); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); /* Weak ref lost. */ GPR_ASSERT(weak_ref.server == nullptr); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_arg chan_args[1]; chan_args[0].type = GRPC_ARG_INTEGER; chan_args[0].key = const_cast(GRPC_ARG_EXPAND_WILDCARD_ADDRS); @@ -435,61 +447,58 @@ int main(int argc, char** argv) { static_cast(gpr_zalloc(sizeof(*dst_addrs))); grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - - test_no_op(); - test_no_op_with_start(); - test_no_op_with_port(); - test_no_op_with_port_and_start(); - - if (getifaddrs(&ifa) != 0 || ifa == nullptr) { - gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno)); - return EXIT_FAILURE; - } - dst_addrs->naddrs = 0; - for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS; - ifa_it = ifa_it->ifa_next) { - if (ifa_it->ifa_addr == nullptr) { - continue; - } else if (ifa_it->ifa_addr->sa_family == AF_INET) { - dst_addrs->addrs[dst_addrs->naddrs].addr.len = - sizeof(struct sockaddr_in); - } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { - dst_addrs->addrs[dst_addrs->naddrs].addr.len = - sizeof(struct sockaddr_in6); - } else { - continue; - } - memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr, - dst_addrs->addrs[dst_addrs->naddrs].addr.len); - GPR_ASSERT( - grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0)); - test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]); - ++dst_addrs->naddrs; - } - freeifaddrs(ifa); - ifa = nullptr; - - /* Connect to same addresses as listeners. */ - test_connect(1, nullptr, nullptr, false); - test_connect(10, nullptr, nullptr, false); + g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); - /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a - "::" listener. */ - test_connect(1, nullptr, dst_addrs, true); + test_no_op(); + test_no_op_with_start(); + test_no_op_with_port(); + test_no_op_with_port_and_start(); - /* Test connect(2) with dst_addrs. */ - test_connect(1, &channel_args, dst_addrs, false); - /* Test connect(2) with dst_addrs. */ - test_connect(10, &channel_args, dst_addrs, false); - - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); + if (getifaddrs(&ifa) != 0 || ifa == nullptr) { + gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno)); + return EXIT_FAILURE; + } + dst_addrs->naddrs = 0; + for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS; + ifa_it = ifa_it->ifa_next) { + if (ifa_it->ifa_addr == nullptr) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + dst_addrs->addrs[dst_addrs->naddrs].addr.len = sizeof(struct sockaddr_in); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + dst_addrs->addrs[dst_addrs->naddrs].addr.len = + sizeof(struct sockaddr_in6); + } else { + continue; + } + memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr, + dst_addrs->addrs[dst_addrs->naddrs].addr.len); + GPR_ASSERT( + grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0)); + test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]); + ++dst_addrs->naddrs; } + freeifaddrs(ifa); + ifa = nullptr; + + /* Connect to same addresses as listeners. */ + test_connect(1, nullptr, nullptr, false); + test_connect(10, nullptr, nullptr, false); + + /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a + "::" listener. */ + test_connect(1, nullptr, dst_addrs, true); + + /* Test connect(2) with dst_addrs. */ + test_connect(1, &channel_args, dst_addrs, false); + /* Test connect(2) with dst_addrs. */ + test_connect(10, &channel_args, dst_addrs, false); + + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(dst_addrs); gpr_free(g_pollset); diff --git a/test/core/iomgr/tcp_server_uv_test.cc b/test/core/iomgr/tcp_server_uv_test.cc index 35d62b51b7..dd047a0498 100644 --- a/test/core/iomgr/tcp_server_uv_test.cc +++ b/test/core/iomgr/tcp_server_uv_test.cc @@ -74,7 +74,8 @@ static void on_connect_result_set(on_connect_result* result, result->fd_index = acceptor->fd_index; } -static void server_weak_ref_shutdown(void* arg, grpc_error* error) { +static void server_weak_ref_shutdown(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { server_weak_ref* weak_ref = static_cast(arg); weak_ref->server = NULL; } @@ -96,11 +97,12 @@ static void server_weak_ref_set(server_weak_ref* weak_ref, weak_ref->server = server; } -static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset, +static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, + grpc_pollset* pollset, grpc_tcp_server_acceptor* acceptor) { - grpc_endpoint_shutdown(tcp, + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); - grpc_endpoint_destroy(tcp); + grpc_endpoint_destroy(exec_ctx, tcp); on_connect_result temp_result; on_connect_result_set(&temp_result, acceptor); @@ -109,33 +111,38 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset, gpr_mu_lock(g_mu); g_result = temp_result; g_nconnects++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", + grpc_pollset_kick(exec_ctx, g_pollset, NULL))); gpr_mu_unlock(g_mu); } static void test_no_op(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); - grpc_tcp_server_unref(s); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); LOG_TEST("test_no_op_with_start"); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); - grpc_tcp_server_unref(s); + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); LOG_TEST("test_no_op_with_port"); memset(&resolved_addr, 0, sizeof(resolved_addr)); @@ -146,15 +153,17 @@ static void test_no_op_with_port(void) { GRPC_ERROR_NONE && port > 0); - grpc_tcp_server_unref(s); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port_and_start(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); LOG_TEST("test_no_op_with_port_and_start"); int port; @@ -165,9 +174,10 @@ static void test_no_op_with_port_and_start(void) { GRPC_ERROR_NONE && port > 0); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); - grpc_tcp_server_unref(s); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } static void connect_cb(uv_connect_t* req, int status) { @@ -177,8 +187,8 @@ static void connect_cb(uv_connect_t* req, int status) { static void close_cb(uv_handle_t* handle) { gpr_free(handle); } -static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len, - on_connect_result* result) { +static void tcp_connect(grpc_exec_ctx* exec_ctx, const struct sockaddr* remote, + socklen_t remote_len, on_connect_result* result) { gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10); uv_tcp_t* client_handle = static_cast(gpr_malloc(sizeof(uv_tcp_t))); @@ -198,10 +208,10 @@ static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len, grpc_pollset_worker* worker = NULL; GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(g_pollset, &worker, + grpc_pollset_work(exec_ctx, g_pollset, &worker, grpc_timespec_to_millis_round_up(deadline)))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(exec_ctx); gpr_mu_lock(g_mu); } gpr_log(GPR_DEBUG, "wait done"); @@ -214,7 +224,7 @@ static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len, /* Tests a tcp server with multiple ports. */ static void test_connect(unsigned n) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; grpc_resolved_address resolved_addr1; struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr; @@ -223,7 +233,8 @@ static void test_connect(unsigned n) { int svr_port; int svr1_port; grpc_tcp_server* s; - GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); + GPR_ASSERT(GRPC_ERROR_NONE == + grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s)); unsigned i; server_weak_ref weak_ref; server_weak_ref_init(&weak_ref); @@ -246,45 +257,48 @@ static void test_connect(unsigned n) { GRPC_ERROR_NONE && svr_port == svr1_port); - grpc_tcp_server_start(s, &g_pollset, 1, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL); GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6*)addr1) == 0); for (i = 0; i < n; i++) { on_connect_result result; on_connect_result_init(&result); - tcp_connect((struct sockaddr*)addr, (socklen_t)resolved_addr.len, &result); + tcp_connect(&exec_ctx, (struct sockaddr*)addr, (socklen_t)resolved_addr.len, + &result); GPR_ASSERT(result.port_index == 0); GPR_ASSERT(result.server == s); if (weak_ref.server == NULL) { server_weak_ref_set(&weak_ref, result.server); } - grpc_tcp_server_unref(result.server); + grpc_tcp_server_unref(&exec_ctx, result.server); on_connect_result_init(&result); - tcp_connect((struct sockaddr*)addr1, (socklen_t)resolved_addr1.len, - &result); + tcp_connect(&exec_ctx, (struct sockaddr*)addr1, + (socklen_t)resolved_addr1.len, &result); GPR_ASSERT(result.port_index == 1); GPR_ASSERT(result.server == s); - grpc_tcp_server_unref(result.server); + grpc_tcp_server_unref(&exec_ctx, result.server); } /* Weak ref to server valid until final unref. */ GPR_ASSERT(weak_ref.server != NULL); - grpc_tcp_server_unref(s); + grpc_tcp_server_unref(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); /* Weak ref lost. */ GPR_ASSERT(weak_ref.server == NULL); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); g_pollset = static_cast(gpr_malloc(grpc_pollset_size())); @@ -299,8 +313,8 @@ int main(int argc, char** argv) { GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); return 0; diff --git a/test/core/iomgr/timer_list_test.cc b/test/core/iomgr/timer_list_test.cc index deb8c4d87e..d74ea4fc96 100644 --- a/test/core/iomgr/timer_list_test.cc +++ b/test/core/iomgr/timer_list_test.cc @@ -25,7 +25,6 @@ #include -#include #include #include "src/core/lib/debug/trace.h" #include "test/core/util/test_config.h" @@ -38,125 +37,127 @@ extern grpc_core::TraceFlag grpc_timer_check_trace; static int cb_called[MAX_CB][2]; -static void cb(void* arg, grpc_error* error) { +static void cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { cb_called[(intptr_t)arg][error == GRPC_ERROR_NONE]++; } static void add_test(void) { int i; grpc_timer timers[20]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "add_test"); - grpc_timer_list_init(); + grpc_timer_list_init(&exec_ctx); grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace); grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace); memset(cb_called, 0, sizeof(cb_called)); - grpc_millis start = grpc_core::ExecCtx::Get()->Now(); + grpc_millis start = grpc_exec_ctx_now(&exec_ctx); /* 10 ms timers. will expire in the current epoch */ for (i = 0; i < 10; i++) { grpc_timer_init( - &timers[i], start + 10, + &exec_ctx, &timers[i], start + 10, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)i, grpc_schedule_on_exec_ctx)); } /* 1010 ms timers. will expire in the next epoch */ for (i = 10; i < 20; i++) { grpc_timer_init( - &timers[i], start + 1010, + &exec_ctx, &timers[i], start + 1010, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)i, grpc_schedule_on_exec_ctx)); } /* collect timers. Only the first batch should be ready. */ - grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 500); - GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED); - grpc_core::ExecCtx::Get()->Flush(); + exec_ctx.now = start + 500; + GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 20; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } - grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 600); - GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_CHECKED_AND_EMPTY); - grpc_core::ExecCtx::Get()->Flush(); + exec_ctx.now = start + 600; + GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == + GRPC_TIMERS_CHECKED_AND_EMPTY); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } /* collect the rest of the timers */ - grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 1500); - GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED); - grpc_core::ExecCtx::Get()->Flush(); + exec_ctx.now = start + 1500; + GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 1600); - GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_CHECKED_AND_EMPTY); + exec_ctx.now = start + 1600; + GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == + GRPC_TIMERS_CHECKED_AND_EMPTY); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - grpc_timer_list_shutdown(); + grpc_timer_list_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); } /* Cleaning up a list with pending timers. */ void destruction_test(void) { grpc_timer timers[5]; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "destruction_test"); - grpc_core::ExecCtx::Get()->TestOnlySetNow(0); - grpc_timer_list_init(); + exec_ctx.now_is_valid = true; + exec_ctx.now = 0; + grpc_timer_list_init(&exec_ctx); grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace); grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace); memset(cb_called, 0, sizeof(cb_called)); grpc_timer_init( - &timers[0], 100, + &exec_ctx, &timers[0], 100, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)0, grpc_schedule_on_exec_ctx)); grpc_timer_init( - &timers[1], 3, + &exec_ctx, &timers[1], 3, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)1, grpc_schedule_on_exec_ctx)); grpc_timer_init( - &timers[2], 100, + &exec_ctx, &timers[2], 100, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)2, grpc_schedule_on_exec_ctx)); grpc_timer_init( - &timers[3], 3, + &exec_ctx, &timers[3], 3, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)3, grpc_schedule_on_exec_ctx)); grpc_timer_init( - &timers[4], 1, + &exec_ctx, &timers[4], 1, GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)4, grpc_schedule_on_exec_ctx)); - grpc_core::ExecCtx::Get()->TestOnlySetNow(2); - GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED); - grpc_core::ExecCtx::Get()->Flush(); + exec_ctx.now = 2; + GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[4][1]); - grpc_timer_cancel(&timers[0]); - grpc_timer_cancel(&timers[3]); - grpc_core::ExecCtx::Get()->Flush(); + grpc_timer_cancel(&exec_ctx, &timers[0]); + grpc_timer_cancel(&exec_ctx, &timers[3]); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[0][0]); GPR_ASSERT(1 == cb_called[3][0]); - grpc_timer_list_shutdown(); - grpc_core::ExecCtx::Get()->Flush(); + grpc_timer_list_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[1][0]); GPR_ASSERT(1 == cb_called[2][0]); } int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_core::ExecCtx::GlobalInit(); gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); add_test(); destruction_test(); - grpc_core::ExecCtx::GlobalShutdown(); return 0; } diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 0deb534abd..6e17be9cd6 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -50,7 +50,7 @@ static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; -static bool on_read(grpc_fd* emfd, void* user_data) { +static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { char read_buffer[512]; ssize_t byte_count; @@ -61,27 +61,27 @@ static bool on_read(grpc_fd* emfd, void* user_data) { g_number_of_reads++; g_number_of_bytes_read += (int)byte_count; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); return false; } -static void on_write(grpc_fd* emfd, void* user_data, +static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data, grpc_closure* notify_on_write_closure) { gpr_mu_lock(g_mu); g_number_of_writes++; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } -static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure, - void* user_data) { +static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, + grpc_closure* closure, void* user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); g_number_of_orphan_calls++; } @@ -130,22 +130,24 @@ static test_socket_factory* test_socket_factory_create(void) { } static void test_no_op(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_udp_server* s = grpc_udp_server_create(nullptr); - grpc_udp_server_destroy(s, nullptr); + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_udp_server* s = grpc_udp_server_create(nullptr); LOG_TEST("test_no_op_with_start"); - grpc_udp_server_start(s, nullptr, 0, nullptr); - grpc_udp_server_destroy(s, nullptr); + grpc_udp_server_start(&exec_ctx, s, nullptr, 0, nullptr); + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { g_number_of_orphan_calls = 0; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_udp_server* s = grpc_udp_server_create(nullptr); @@ -157,7 +159,8 @@ static void test_no_op_with_port(void) { GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, on_fd_orphaned)); - grpc_udp_server_destroy(s, nullptr); + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which should have been orphaned. */ GPR_ASSERT(g_number_of_orphan_calls == 1); @@ -165,7 +168,7 @@ static void test_no_op_with_port(void) { static void test_no_op_with_port_and_socket_factory(void) { g_number_of_orphan_calls = 0; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; @@ -175,7 +178,7 @@ static void test_no_op_with_port_and_socket_factory(void) { grpc_channel_args* channel_args = grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1); grpc_udp_server* s = grpc_udp_server_create(channel_args); - grpc_channel_args_destroy(channel_args); + grpc_channel_args_destroy(&exec_ctx, channel_args); LOG_TEST("test_no_op_with_port_and_socket_factory"); @@ -187,8 +190,8 @@ static void test_no_op_with_port_and_socket_factory(void) { GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); - grpc_udp_server_destroy(s, nullptr); - + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); grpc_socket_factory_unref(&socket_factory->base); /* The server had a single FD, which should have been orphaned. */ @@ -197,7 +200,7 @@ static void test_no_op_with_port_and_socket_factory(void) { static void test_no_op_with_port_and_start(void) { g_number_of_orphan_calls = 0; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; grpc_udp_server* s = grpc_udp_server_create(nullptr); @@ -209,9 +212,10 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, on_fd_orphaned)); - grpc_udp_server_start(s, nullptr, 0, nullptr); + grpc_udp_server_start(&exec_ctx, s, nullptr, 0, nullptr); - grpc_udp_server_destroy(s, nullptr); + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ @@ -219,7 +223,7 @@ static void test_no_op_with_port_and_start(void) { } static void test_receive(int number_of_clients) { - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address resolved_addr; struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr; int clifd, svrfd; @@ -246,7 +250,7 @@ static void test_receive(int number_of_clients) { GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage)); pollsets[0] = g_pollset; - grpc_udp_server_start(s, pollsets, 1, nullptr); + grpc_udp_server_start(&exec_ctx, s, pollsets, 1, nullptr); gpr_mu_lock(g_mu); @@ -262,12 +266,13 @@ static void test_receive(int number_of_clients) { (socklen_t)resolved_addr.len) == 0); GPR_ASSERT(5 == write(clifd, "hello", 5)); while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) && - deadline > grpc_core::ExecCtx::Get()->Now()) { + deadline > grpc_exec_ctx_now(&exec_ctx)) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } close(clifd); @@ -276,40 +281,40 @@ static void test_receive(int number_of_clients) { gpr_mu_unlock(g_mu); - grpc_udp_server_destroy(s, nullptr); + grpc_udp_server_destroy(&exec_ctx, s, nullptr); + grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); } -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy(static_cast(p)); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, static_cast(p)); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - - test_no_op(); - test_no_op_with_start(); - test_no_op_with_port(); - test_no_op_with_port_and_socket_factory(); - test_no_op_with_port_and_start(); - test_receive(1); - test_receive(10); - - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - grpc_core::ExecCtx::Get()->Flush(); - gpr_free(g_pollset); - } + g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + + test_no_op(); + test_no_op_with_start(); + test_no_op_with_port(); + test_no_op_with_port_and_socket_factory(); + test_no_op_with_port_and_start(); + test_receive(1); + test_receive(10); + + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(g_pollset); grpc_shutdown(); return 0; } -- cgit v1.2.3