aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /test/core/iomgr
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/core/iomgr')
-rw-r--r--test/core/iomgr/combiner_test.cc51
-rw-r--r--test/core/iomgr/endpoint_pair_test.cc29
-rw-r--r--test/core/iomgr/endpoint_tests.cc99
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.cc109
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.cc37
-rw-r--r--test/core/iomgr/fd_posix_test.cc156
-rw-r--r--test/core/iomgr/load_file_test.cc3
-rw-r--r--test/core/iomgr/pollset_set_test.cc245
-rw-r--r--test/core/iomgr/resolve_address_posix_test.cc65
-rw-r--r--test/core/iomgr/resolve_address_test.cc153
-rw-r--r--test/core/iomgr/resource_quota_test.cc354
-rw-r--r--test/core/iomgr/tcp_client_posix_test.cc83
-rw-r--r--test/core/iomgr/tcp_client_uv_test.cc62
-rw-r--r--test/core/iomgr/tcp_posix_test.cc153
-rw-r--r--test/core/iomgr/tcp_server_posix_test.cc183
-rw-r--r--test/core/iomgr/tcp_server_uv_test.cc90
-rw-r--r--test/core/iomgr/timer_list_test.cc75
-rw-r--r--test/core/iomgr/udp_server_test.cc105
18 files changed, 1104 insertions, 948 deletions
diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc
index 891008c774..33d892fa06 100644
--- a/test/core/iomgr/combiner_test.cc
+++ b/test/core/iomgr/combiner_test.cc
@@ -28,11 +28,13 @@
static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op");
- grpc_core::ExecCtx exec_ctx;
- GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_COMBINER_UNREF(&exec_ctx, grpc_combiner_create(), "test_no_op");
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void set_event_to_true(void* value, grpc_error* error) {
+static void set_event_to_true(grpc_exec_ctx* exec_ctx, void* value,
+ grpc_error* error) {
gpr_event_set(static_cast<gpr_event*>(value), (void*)1);
}
@@ -42,14 +44,16 @@ static void test_execute_one(void) {
grpc_combiner* lock = grpc_combiner_create();
gpr_event done;
gpr_event_init(&done);
- grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done,
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_CLOSURE_SCHED(&exec_ctx,
+ GRPC_CLOSURE_CREATE(set_event_to_true, &done,
grpc_combiner_scheduler(lock)),
GRPC_ERROR_NONE);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
- GRPC_COMBINER_UNREF(lock, "test_execute_one");
+ GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_one");
+ grpc_exec_ctx_finish(&exec_ctx);
}
typedef struct {
@@ -63,7 +67,7 @@ typedef struct {
size_t value;
} ex_args;
-static void check_one(void* a, grpc_error* error) {
+static void check_one(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) {
ex_args* args = static_cast<ex_args*>(a);
GPR_ASSERT(*args->ctr == args->value - 1);
*args->ctr = args->value;
@@ -72,25 +76,28 @@ static void check_one(void* a, grpc_error* error) {
static void execute_many_loop(void* a) {
thd_args* args = static_cast<thd_args*>(a);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 1;
for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 10000; j++) {
ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
c->ctr = &args->ctr;
c->value = n++;
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(
+ GRPC_CLOSURE_SCHED(&exec_ctx,
+ GRPC_CLOSURE_CREATE(
check_one, c, grpc_combiner_scheduler(args->lock)),
GRPC_ERROR_NONE);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
}
// sleep for a little bit, to test a combiner draining and another thread
// picking it up
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
}
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done,
+ GRPC_CLOSURE_SCHED(&exec_ctx,
+ GRPC_CLOSURE_CREATE(set_event_to_true, &args->done,
grpc_combiner_scheduler(args->lock)),
GRPC_ERROR_NONE);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_execute_many(void) {
@@ -113,18 +120,20 @@ static void test_execute_many(void) {
gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr);
gpr_thd_join(thds[i]);
}
- grpc_core::ExecCtx exec_ctx;
- GRPC_COMBINER_UNREF(lock, "test_execute_many");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_many");
+ grpc_exec_ctx_finish(&exec_ctx);
}
static gpr_event got_in_finally;
-static void in_finally(void* arg, grpc_error* error) {
+static void in_finally(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
gpr_event_set(&got_in_finally, (void*)1);
}
-static void add_finally(void* arg, grpc_error* error) {
- GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg,
+static void add_finally(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(in_finally, arg,
grpc_combiner_finally_scheduler(
static_cast<grpc_combiner*>(arg))),
GRPC_ERROR_NONE);
@@ -134,15 +143,17 @@ static void test_execute_finally(void) {
gpr_log(GPR_DEBUG, "test_execute_finally");
grpc_combiner* lock = grpc_combiner_create();
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_event_init(&got_in_finally);
GRPC_CLOSURE_SCHED(
+ &exec_ctx,
GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)),
GRPC_ERROR_NONE);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&got_in_finally,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
- GRPC_COMBINER_UNREF(lock, "test_execute_finally");
+ GRPC_COMBINER_UNREF(&exec_ctx, lock, "test_execute_finally");
+ grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char** argv) {
diff --git a/test/core/iomgr/endpoint_pair_test.cc b/test/core/iomgr/endpoint_pair_test.cc
index 90dd40d9c4..30a0cb5924 100644
--- a/test/core/iomgr/endpoint_pair_test.cc
+++ b/test/core/iomgr/endpoint_pair_test.cc
@@ -32,7 +32,7 @@ static void clean_up(void) {}
static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_test_fixture f;
grpc_arg a[1];
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
@@ -43,8 +43,9 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
f.client_ep = p.client;
f.server_ep = p.server;
- grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
@@ -53,23 +54,23 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up},
};
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
- grpc_endpoint_tests(configs[0], g_pollset, g_mu);
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- }
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
+ grpc_endpoint_tests(configs[0], g_pollset, g_mu);
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);
diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc
index 8ccae52067..026e34105d 100644
--- a/test/core/iomgr/endpoint_tests.cc
+++ b/test/core/iomgr/endpoint_tests.cc
@@ -115,7 +115,8 @@ struct read_and_write_test_state {
grpc_closure done_write;
};
-static void read_and_write_test_read_handler(void* data, grpc_error* error) {
+static void read_and_write_test_read_handler(grpc_exec_ctx* exec_ctx,
+ void* data, grpc_error* error) {
struct read_and_write_test_state* state =
(struct read_and_write_test_state*)data;
@@ -125,14 +126,17 @@ static void read_and_write_test_read_handler(void* data, grpc_error* error) {
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(g_mu);
state->read_done = 1 + (error == GRPC_ERROR_NONE);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
+ GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, g_pollset, nullptr));
gpr_mu_unlock(g_mu);
} else if (error == GRPC_ERROR_NONE) {
- grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read);
+ grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming,
+ &state->done_read);
}
}
-static void read_and_write_test_write_handler(void* data, grpc_error* error) {
+static void read_and_write_test_write_handler(grpc_exec_ctx* exec_ctx,
+ void* data, grpc_error* error) {
struct read_and_write_test_state* state =
(struct read_and_write_test_state*)data;
grpc_slice* slices = nullptr;
@@ -149,7 +153,7 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
- grpc_endpoint_write(state->write_ep, &state->outgoing,
+ grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing,
&state->done_write);
gpr_free(slices);
return;
@@ -159,7 +163,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(g_mu);
state->write_done = 1 + (error == GRPC_ERROR_NONE);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
+ GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, g_pollset, nullptr));
gpr_mu_unlock(g_mu);
}
@@ -173,7 +178,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
struct read_and_write_test_state state;
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
gpr_log(GPR_DEBUG,
@@ -212,57 +217,66 @@ static void read_and_write_test(grpc_endpoint_test_config config,
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
- grpc_core::ExecCtx::Get()->Flush();
+ read_and_write_test_write_handler(&exec_ctx, &state, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
- grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read);
+ grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming,
+ &state.done_read);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(
- state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
+ &exec_ctx, state.read_ep,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(
- state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
+ &exec_ctx, state.write_ep,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
}
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
while (!state.read_done || !state.write_done) {
grpc_pollset_worker* worker = nullptr;
- GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
+ GPR_ASSERT(grpc_exec_ctx_now(&exec_ctx) < deadline);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
}
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
end_test(config);
- grpc_slice_buffer_destroy_internal(&state.outgoing);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_endpoint_destroy(state.read_ep);
- grpc_endpoint_destroy(state.write_ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.outgoing);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, state.read_ep);
+ grpc_endpoint_destroy(&exec_ctx, state.write_ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void inc_on_failure(void* arg, grpc_error* error) {
+static void inc_on_failure(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
gpr_mu_lock(g_mu);
*(int*)arg += (error != GRPC_ERROR_NONE);
- GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
-static void wait_for_fail_count(int* fail_count, int want_fail_count) {
- grpc_core::ExecCtx::Get()->Flush();
+static void wait_for_fail_count(grpc_exec_ctx* exec_ctx, int* fail_count,
+ int want_fail_count) {
+ grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(g_mu);
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
- while (grpc_core::ExecCtx::Get()->Now() < deadline &&
+ while (grpc_exec_ctx_now(exec_ctx) < deadline &&
*fail_count < want_fail_count) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(*fail_count == want_fail_count);
@@ -277,32 +291,33 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_slice_buffer slice_buffer;
grpc_slice_buffer_init(&slice_buffer);
- grpc_core::ExecCtx exec_ctx;
- grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
- grpc_endpoint_read(f.client_ep, &slice_buffer,
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
+ grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
- wait_for_fail_count(&fail_count, 0);
- grpc_endpoint_shutdown(f.client_ep,
+ wait_for_fail_count(&exec_ctx, &fail_count, 0);
+ grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
- wait_for_fail_count(&fail_count, 1);
- grpc_endpoint_read(f.client_ep, &slice_buffer,
+ wait_for_fail_count(&exec_ctx, &fail_count, 1);
+ grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
- wait_for_fail_count(&fail_count, 2);
+ wait_for_fail_count(&exec_ctx, &fail_count, 2);
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
- grpc_endpoint_write(f.client_ep, &slice_buffer,
+ grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
- wait_for_fail_count(&fail_count, 3);
- grpc_endpoint_shutdown(f.client_ep,
+ wait_for_fail_count(&exec_ctx, &fail_count, 3);
+ grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
- wait_for_fail_count(&fail_count, 3);
+ wait_for_fail_count(&exec_ctx, &fail_count, 3);
- grpc_slice_buffer_destroy_internal(&slice_buffer);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer);
- grpc_endpoint_destroy(f.client_ep);
- grpc_endpoint_destroy(f.server_ep);
+ grpc_endpoint_destroy(&exec_ctx, f.client_ep);
+ grpc_endpoint_destroy(&exec_ctx, f.server_ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc
index e767e01f21..94f387164a 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.cc
+++ b/test/core/iomgr/ev_epollsig_linux_test.cc
@@ -70,18 +70,19 @@ static void test_fd_init(test_fd* tfds, int* fds, int num_fds) {
}
}
-static void test_fd_cleanup(test_fd* tfds, int num_fds) {
+static void test_fd_cleanup(grpc_exec_ctx* exec_ctx, test_fd* tfds,
+ int num_fds) {
int release_fd;
int i;
for (i = 0; i < num_fds; i++) {
- grpc_fd_shutdown(tfds[i].fd,
+ grpc_fd_shutdown(exec_ctx, tfds[i].fd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
- grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */,
- "test_fd_cleanup");
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, nullptr, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
+ grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(release_fd == tfds[i].inner_fd);
close(tfds[i].inner_fd);
@@ -97,20 +98,22 @@ static void test_pollset_init(test_pollset* pollsets, int num_pollsets) {
}
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy((grpc_pollset*)p);
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p);
}
-static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) {
+static void test_pollset_cleanup(grpc_exec_ctx* exec_ctx,
+ test_pollset* pollsets, int num_pollsets) {
grpc_closure destroyed;
int i;
for (i = 0; i < num_pollsets; i++) {
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].pollset,
grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(pollsets[i].pollset, &destroyed);
+ grpc_pollset_shutdown(exec_ctx, pollsets[i].pollset, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
gpr_free(pollsets[i].pollset);
}
}
@@ -130,7 +133,7 @@ static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) {
#define NUM_POLLSETS 4
static void test_add_fd_to_pollset() {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
test_fd tfds[NUM_FDS];
int fds[NUM_FDS];
test_pollset pollsets[NUM_POLLSETS];
@@ -167,33 +170,33 @@ static void test_add_fd_to_pollset() {
/* == Step 1 == */
for (i = 0; i <= 2; i++) {
- grpc_pollset_add_fd(pollsets[0].pollset, tfds[i].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[0].pollset, tfds[i].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
}
for (i = 3; i <= 4; i++) {
- grpc_pollset_add_fd(pollsets[1].pollset, tfds[i].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, tfds[i].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
}
for (i = 5; i <= 7; i++) {
- grpc_pollset_add_fd(pollsets[2].pollset, tfds[i].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, tfds[i].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
}
/* == Step 2 == */
for (i = 0; i <= 1; i++) {
- grpc_pollset_add_fd(pollsets[3].pollset, tfds[i].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[3].pollset, tfds[i].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
}
/* == Step 3 == */
- grpc_pollset_add_fd(pollsets[1].pollset, tfds[0].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, tfds[0].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
/* == Step 4 == */
- grpc_pollset_add_fd(pollsets[2].pollset, tfds[3].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, tfds[3].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
/* All polling islands are merged at this point */
@@ -210,8 +213,9 @@ static void test_add_fd_to_pollset() {
expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset)));
}
- test_fd_cleanup(tfds, NUM_FDS);
- test_pollset_cleanup(pollsets, NUM_POLLSETS);
+ test_fd_cleanup(&exec_ctx, tfds, NUM_FDS);
+ test_pollset_cleanup(&exec_ctx, pollsets, NUM_POLLSETS);
+ grpc_exec_ctx_finish(&exec_ctx);
}
#undef NUM_FDS
@@ -231,24 +235,26 @@ static __thread int thread_wakeups = 0;
static void test_threading_loop(void* arg) {
threading_shared* shared = static_cast<threading_shared*>(arg);
while (thread_wakeups < 1000000) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker* worker;
gpr_mu_lock(shared->mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(shared->pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, shared->pollset, &worker,
+ GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(shared->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
-static void test_threading_wakeup(void* arg, grpc_error* error) {
+static void test_threading_wakeup(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
threading_shared* shared = static_cast<threading_shared*>(arg);
++shared->wakeups;
++thread_wakeups;
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"consume_wakeup", grpc_wakeup_fd_consume_wakeup(shared->wakeup_fd)));
- grpc_fd_notify_on_read(shared->wakeup_desc, &shared->on_wakeup);
+ grpc_fd_notify_on_read(exec_ctx, shared->wakeup_desc, &shared->on_wakeup);
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_next",
grpc_wakeup_fd_wakeup(shared->wakeup_fd)));
}
@@ -271,12 +277,13 @@ static void test_threading(void) {
shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup");
shared.wakeups = 0;
{
- grpc_core::ExecCtx exec_ctx;
- grpc_pollset_add_fd(shared.pollset, shared.wakeup_desc);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_pollset_add_fd(&exec_ctx, shared.pollset, shared.wakeup_desc);
grpc_fd_notify_on_read(
- shared.wakeup_desc,
+ &exec_ctx, shared.wakeup_desc,
GRPC_CLOSURE_INIT(&shared.on_wakeup, test_threading_wakeup, &shared,
grpc_schedule_on_exec_ctx));
+ grpc_exec_ctx_finish(&exec_ctx);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_first",
grpc_wakeup_fd_wakeup(shared.wakeup_fd)));
@@ -286,13 +293,14 @@ static void test_threading(void) {
fd.read_fd = 0;
grpc_wakeup_fd_destroy(&fd);
{
- grpc_core::ExecCtx exec_ctx;
- grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED);
- grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr,
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED);
+ grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, nullptr, nullptr,
false /* already_closed */, "done");
- grpc_pollset_shutdown(shared.pollset,
+ grpc_pollset_shutdown(&exec_ctx, shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));
+ grpc_exec_ctx_finish(&exec_ctx);
}
gpr_free(shared.pollset);
}
@@ -301,21 +309,20 @@ int main(int argc, char** argv) {
const char* poll_strategy = nullptr;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
-
- poll_strategy = grpc_get_poll_strategy_name();
- if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) {
- test_add_fd_to_pollset();
- test_threading();
- } else {
- gpr_log(GPR_INFO,
- "Skipping the test. The test is only relevant for 'epollsig' "
- "strategy. and the current strategy is: '%s'",
- poll_strategy);
- }
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ poll_strategy = grpc_get_poll_strategy_name();
+ if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) {
+ test_add_fd_to_pollset();
+ test_threading();
+ } else {
+ gpr_log(GPR_INFO,
+ "Skipping the test. The test is only relevant for 'epollsig' "
+ "strategy. and the current strategy is: '%s'",
+ poll_strategy);
}
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/fd_conservation_posix_test.cc b/test/core/iomgr/fd_conservation_posix_test.cc
index aaa14010f8..f46430c611 100644
--- a/test/core/iomgr/fd_conservation_posix_test.cc
+++ b/test/core/iomgr/fd_conservation_posix_test.cc
@@ -31,27 +31,26 @@ int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
-
- /* set max # of file descriptors to a low value, and
- verify we can create and destroy many more than this number
- of descriptors */
- rlim.rlim_cur = rlim.rlim_max = 10;
- GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
- grpc_resource_quota* resource_quota =
- grpc_resource_quota_create("fd_conservation_posix_test");
-
- for (i = 0; i < 100; i++) {
- p = grpc_iomgr_create_endpoint_pair("test", NULL);
- grpc_endpoint_destroy(p.client);
- grpc_endpoint_destroy(p.server);
- grpc_core::ExecCtx::Get()->Flush();
- }
-
- grpc_resource_quota_unref(resource_quota);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ /* set max # of file descriptors to a low value, and
+ verify we can create and destroy many more than this number
+ of descriptors */
+ rlim.rlim_cur = rlim.rlim_max = 10;
+ GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
+ grpc_resource_quota* resource_quota =
+ grpc_resource_quota_create("fd_conservation_posix_test");
+
+ for (i = 0; i < 100; i++) {
+ p = grpc_iomgr_create_endpoint_pair("test", nullptr);
+ grpc_endpoint_destroy(&exec_ctx, p.client);
+ grpc_endpoint_destroy(&exec_ctx, p.server);
+ grpc_exec_ctx_flush(&exec_ctx);
}
+ grpc_resource_quota_unref(resource_quota);
+
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc
index cf75517538..a03d841ecd 100644
--- a/test/core/iomgr/fd_posix_test.cc
+++ b/test/core/iomgr/fd_posix_test.cc
@@ -111,19 +111,20 @@ typedef struct {
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
-static void session_shutdown_cb(void* arg, /*session */
+static void session_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
bool success) {
session* se = static_cast<session*>(arg);
server* sv = se->sv;
- grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a");
+ grpc_fd_orphan(exec_ctx, se->em_fd, nullptr, nullptr,
+ false /* already_closed */, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_fd_shutdown(sv->em_fd,
+ grpc_fd_shutdown(exec_ctx, sv->em_fd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
}
/* Called when data become readable in a session. */
-static void session_read_cb(void* arg, /*session */
+static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
grpc_error* error) {
session* se = static_cast<session*>(arg);
int fd = grpc_fd_wrapped_fd(se->em_fd);
@@ -132,7 +133,7 @@ static void session_read_cb(void* arg, /*session */
ssize_t read_total = 0;
if (error != GRPC_ERROR_NONE) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -147,7 +148,7 @@ static void session_read_cb(void* arg, /*session */
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(exec_ctx, arg, 1);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -158,7 +159,7 @@ static void session_read_cb(void* arg, /*session */
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@@ -168,20 +169,22 @@ static void session_read_cb(void* arg, /*session */
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(void* arg /*server */, int success) {
+static void listen_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg /*server */,
+ int success) {
server* sv = static_cast<server*>(arg);
- grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b");
+ grpc_fd_orphan(exec_ctx, sv->em_fd, nullptr, nullptr,
+ false /* already_closed */, "b");
gpr_mu_lock(g_mu);
sv->done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
/* Called when a new TCP connection request arrives in the listening port. */
-static void listen_cb(void* arg, /*=sv_arg*/
+static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/
grpc_error* error) {
server* sv = static_cast<server*>(arg);
int fd;
@@ -192,7 +195,7 @@ static void listen_cb(void* arg, /*=sv_arg*/
grpc_fd* listen_em_fd = sv->em_fd;
if (error != GRPC_ERROR_NONE) {
- listen_shutdown_cb(arg, 1);
+ listen_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -204,12 +207,12 @@ static void listen_cb(void* arg, /*=sv_arg*/
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
- grpc_pollset_add_fd(g_pollset, se->em_fd);
+ grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd);
GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
- grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
}
/* Max number of connections pending to be accepted by listen(). */
@@ -219,7 +222,7 @@ static void listen_cb(void* arg, /*=sv_arg*/
listen_cb() is registered to be interested in reading from listen_fd.
When connection request arrives, listen_cb() is called to accept the
connection request. */
-static int server_start(server* sv) {
+static int server_start(grpc_exec_ctx* exec_ctx, server* sv) {
int port = 0;
int fd;
struct sockaddr_in sin;
@@ -233,11 +236,11 @@ static int server_start(server* sv) {
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd, "server");
- grpc_pollset_add_fd(g_pollset, sv->em_fd);
+ grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
return port;
}
@@ -246,13 +249,13 @@ static int server_start(server* sv) {
static void server_wait_and_shutdown(server* sv) {
gpr_mu_lock(g_mu);
while (!sv->done) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -286,16 +289,18 @@ static void client_init(client* cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(void* arg /*client */, int success) {
+static void client_session_shutdown_cb(grpc_exec_ctx* exec_ctx,
+ void* arg /*client */, int success) {
client* cl = static_cast<client*>(arg);
- grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c");
+ grpc_fd_orphan(exec_ctx, cl->em_fd, nullptr, nullptr,
+ false /* already_closed */, "c");
cl->done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
}
/* Write as much as possible, then register notify_on_write. */
-static void client_session_write(void* arg, /*client */
+static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */
grpc_error* error) {
client* cl = static_cast<client*>(arg);
int fd = grpc_fd_wrapped_fd(cl->em_fd);
@@ -303,7 +308,7 @@ static void client_session_write(void* arg, /*client */
if (error != GRPC_ERROR_NONE) {
gpr_mu_lock(g_mu);
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(exec_ctx, arg, 1);
gpr_mu_unlock(g_mu);
return;
}
@@ -318,10 +323,10 @@ static void client_session_write(void* arg, /*client */
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
+ grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(exec_ctx, arg, 1);
}
gpr_mu_unlock(g_mu);
} else {
@@ -331,7 +336,7 @@ static void client_session_write(void* arg, /*client */
}
/* Start a client to send a stream of bytes. */
-static void client_start(client* cl, int port) {
+static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) {
int fd;
struct sockaddr_in sin;
create_test_socket(port, &fd, &sin);
@@ -352,9 +357,9 @@ static void client_start(client* cl, int port) {
}
cl->em_fd = grpc_fd_create(fd, "client");
- grpc_pollset_add_fd(g_pollset, cl->em_fd);
+ grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd);
- client_session_write(cl, GRPC_ERROR_NONE);
+ client_session_write(exec_ctx, cl, GRPC_ERROR_NONE);
}
/* Wait for the signal to shutdown a client. */
@@ -362,12 +367,12 @@ static void client_wait_and_shutdown(client* cl) {
gpr_mu_lock(g_mu);
while (!cl->done) {
grpc_pollset_worker* worker = nullptr;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -380,13 +385,13 @@ static void test_grpc_fd(void) {
server sv;
client cl;
int port;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
server_init(&sv);
- port = server_start(&sv);
+ port = server_start(&exec_ctx, &sv);
client_init(&cl);
- client_start(&cl, port);
-
+ client_start(&exec_ctx, &cl, port);
+ grpc_exec_ctx_finish(&exec_ctx);
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
@@ -401,25 +406,27 @@ void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
void destroy_change_data(fd_change_data* fdc) {}
-static void first_read_callback(void* arg /* fd_change_data */,
+static void first_read_callback(grpc_exec_ctx* exec_ctx,
+ void* arg /* fd_change_data */,
grpc_error* error) {
fd_change_data* fdc = static_cast<fd_change_data*>(arg);
gpr_mu_lock(g_mu);
fdc->cb_that_ran = first_read_callback;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
-static void second_read_callback(void* arg /* fd_change_data */,
+static void second_read_callback(grpc_exec_ctx* exec_ctx,
+ void* arg /* fd_change_data */,
grpc_error* error) {
fd_change_data* fdc = static_cast<fd_change_data*>(arg);
gpr_mu_lock(g_mu);
fdc->cb_that_ran = second_read_callback;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
@@ -436,7 +443,7 @@ static void test_grpc_fd_change(void) {
ssize_t result;
grpc_closure first_closure;
grpc_closure second_closure;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
grpc_schedule_on_exec_ctx);
@@ -453,10 +460,10 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
- grpc_pollset_add_fd(g_pollset, em_fd);
+ grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, &first_closure);
+ grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -466,10 +473,10 @@ static void test_grpc_fd_change(void) {
while (a.cb_that_ran == nullptr) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
@@ -481,7 +488,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(em_fd, &second_closure);
+ grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -490,43 +497,44 @@ static void test_grpc_fd_change(void) {
while (b.cb_that_ran == nullptr) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
- grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d");
-
+ grpc_fd_orphan(&exec_ctx, em_fd, nullptr, nullptr, false /* already_closed */,
+ "d");
+ grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
- test_grpc_fd();
- test_grpc_fd_change();
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
- gpr_free(g_pollset);
- }
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
+ test_grpc_fd();
+ test_grpc_fd_change();
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_flush(&exec_ctx);
+ gpr_free(g_pollset);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/load_file_test.cc b/test/core/iomgr/load_file_test.cc
index 797d0ef1a4..9f360badcc 100644
--- a/test/core/iomgr/load_file_test.cc
+++ b/test/core/iomgr/load_file_test.cc
@@ -19,7 +19,6 @@
#include <stdio.h>
#include <string.h>
-#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -153,11 +152,9 @@ static void test_load_big_file(void) {
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
- grpc_init();
test_load_empty_file();
test_load_failure();
test_load_small_file();
test_load_big_file();
- grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc
index f27079134b..719eab91fe 100644
--- a/test/core/iomgr/pollset_set_test.cc
+++ b/test/core/iomgr/pollset_set_test.cc
@@ -47,10 +47,11 @@ void init_test_pollset_sets(test_pollset_set* pollset_sets, const int num_pss) {
}
}
-void cleanup_test_pollset_sets(test_pollset_set* pollset_sets,
+void cleanup_test_pollset_sets(grpc_exec_ctx* exec_ctx,
+ test_pollset_set* pollset_sets,
const int num_pss) {
for (int i = 0; i < num_pss; i++) {
- grpc_pollset_set_destroy(pollset_sets[i].pss);
+ grpc_pollset_set_destroy(exec_ctx, pollset_sets[i].pss);
pollset_sets[i].pss = nullptr;
}
}
@@ -72,19 +73,21 @@ static void init_test_pollsets(test_pollset* pollsets, const int num_pollsets) {
}
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
-static void cleanup_test_pollsets(test_pollset* pollsets,
+static void cleanup_test_pollsets(grpc_exec_ctx* exec_ctx,
+ test_pollset* pollsets,
const int num_pollsets) {
grpc_closure destroyed;
for (int i = 0; i < num_pollsets; i++) {
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].ps,
grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(pollsets[i].ps, &destroyed);
+ grpc_pollset_shutdown(exec_ctx, pollsets[i].ps, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
gpr_free(pollsets[i].ps);
pollsets[i].ps = nullptr;
}
@@ -102,43 +105,45 @@ typedef struct test_fd {
grpc_closure on_readable; /* Closure to call when this fd is readable */
} test_fd;
-void on_readable(void* tfd, grpc_error* error) {
+void on_readable(grpc_exec_ctx* exec_ctx, void* tfd, grpc_error* error) {
((test_fd*)tfd)->is_on_readable_called = true;
}
-static void reset_test_fd(test_fd* tfd) {
+static void reset_test_fd(grpc_exec_ctx* exec_ctx, test_fd* tfd) {
tfd->is_on_readable_called = false;
GRPC_CLOSURE_INIT(&tfd->on_readable, on_readable, tfd,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(tfd->fd, &tfd->on_readable);
+ grpc_fd_notify_on_read(exec_ctx, tfd->fd, &tfd->on_readable);
}
-static void init_test_fds(test_fd* tfds, const int num_fds) {
+static void init_test_fds(grpc_exec_ctx* exec_ctx, test_fd* tfds,
+ const int num_fds) {
for (int i = 0; i < num_fds; i++) {
GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd));
tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd),
"test_fd");
- reset_test_fd(&tfds[i]);
+ reset_test_fd(exec_ctx, &tfds[i]);
}
}
-static void cleanup_test_fds(test_fd* tfds, const int num_fds) {
+static void cleanup_test_fds(grpc_exec_ctx* exec_ctx, test_fd* tfds,
+ const int num_fds) {
int release_fd;
for (int i = 0; i < num_fds; i++) {
- grpc_fd_shutdown(tfds[i].fd,
+ grpc_fd_shutdown(exec_ctx, tfds[i].fd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("fd cleanup"));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(exec_ctx);
/* grpc_fd_orphan frees the memory allocated for grpc_fd. Normally it also
* calls close() on the underlying fd. In our case, we are using
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */
- grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */,
- "test_fd_cleanup");
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, nullptr, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
+ grpc_exec_ctx_flush(exec_ctx);
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);
}
@@ -150,7 +155,8 @@ static void make_test_fds_readable(test_fd* tfds, const int num_fds) {
}
}
-static void verify_readable_and_reset(test_fd* tfds, const int num_fds) {
+static void verify_readable_and_reset(grpc_exec_ctx* exec_ctx, test_fd* tfds,
+ const int num_fds) {
for (int i = 0; i < num_fds; i++) {
/* Verify that the on_readable callback was called */
GPR_ASSERT(tfds[i].is_on_readable_called);
@@ -158,7 +164,7 @@ static void verify_readable_and_reset(test_fd* tfds, const int num_fds) {
/* Reset the tfd[i] structure */
GPR_ASSERT(GRPC_ERROR_NONE ==
grpc_wakeup_fd_consume_wakeup(&tfds[i].wakeup_fd));
- reset_test_fd(&tfds[i]);
+ reset_test_fd(exec_ctx, &tfds[i]);
}
}
@@ -199,7 +205,7 @@ static void pollset_set_test_basic() {
* |
* +---> FD9 (Added after PS2 is added to PSS0)
*/
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker* worker;
grpc_millis deadline;
@@ -210,33 +216,34 @@ static void pollset_set_test_basic() {
const int num_ps = GPR_ARRAY_SIZE(pollsets);
const int num_pss = GPR_ARRAY_SIZE(pollset_sets);
- init_test_fds(tfds, num_fds);
+ init_test_fds(&exec_ctx, tfds, num_fds);
init_test_pollsets(pollsets, num_ps);
init_test_pollset_sets(pollset_sets, num_pss);
/* Construct the pollset_set/pollset/fd tree (see diagram above) */
- grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd);
- grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd);
- grpc_pollset_add_fd(pollsets[0].ps, tfds[2].fd);
- grpc_pollset_add_fd(pollsets[1].ps, tfds[3].fd);
- grpc_pollset_add_fd(pollsets[2].ps, tfds[4].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[0].ps, tfds[2].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[3].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[2].ps, tfds[4].fd);
- grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
+ grpc_pollset_set_add_pollset_set(&exec_ctx, pollset_sets[0].pss,
+ pollset_sets[1].pss);
- grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollsets[0].ps);
- grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[1].ps);
- grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[2].ps);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[1].pss, pollsets[0].ps);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[1].ps);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[2].ps);
- grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[5].fd);
- grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[6].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[5].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[6].fd);
- grpc_pollset_add_fd(pollsets[0].ps, tfds[7].fd);
- grpc_pollset_add_fd(pollsets[1].ps, tfds[8].fd);
- grpc_pollset_add_fd(pollsets[2].ps, tfds[9].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[0].ps, tfds[7].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[8].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[2].ps, tfds[9].fd);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
/* Test that if any FD in the above structure is readable, it is observable by
* doing grpc_pollset_work on any pollset
@@ -256,32 +263,34 @@ static void pollset_set_test_basic() {
deadline = grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE ==
- grpc_pollset_work(pollsets[i].ps, &worker, deadline));
+ grpc_pollset_work(&exec_ctx, pollsets[i].ps, &worker, deadline));
gpr_mu_unlock(pollsets[i].mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
- verify_readable_and_reset(tfds, num_fds);
- grpc_core::ExecCtx::Get()->Flush();
+ verify_readable_and_reset(&exec_ctx, tfds, num_fds);
+ grpc_exec_ctx_flush(&exec_ctx);
}
/* Test tear down */
- grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd);
- grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[5].fd);
- grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd);
- grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[6].fd);
- grpc_core::ExecCtx::Get()->Flush();
-
- grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollsets[0].ps);
- grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[1].ps);
- grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[2].ps);
-
- grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
- grpc_core::ExecCtx::Get()->Flush();
-
- cleanup_test_fds(tfds, num_fds);
- cleanup_test_pollsets(pollsets, num_ps);
- cleanup_test_pollset_sets(pollset_sets, num_pss);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[5].fd);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[6].fd);
+ grpc_exec_ctx_flush(&exec_ctx);
+
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[1].pss, pollsets[0].ps);
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[1].ps);
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[0].pss, pollsets[2].ps);
+
+ grpc_pollset_set_del_pollset_set(&exec_ctx, pollset_sets[0].pss,
+ pollset_sets[1].pss);
+ grpc_exec_ctx_flush(&exec_ctx);
+
+ cleanup_test_fds(&exec_ctx, tfds, num_fds);
+ cleanup_test_pollsets(&exec_ctx, pollsets, num_ps);
+ cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Same FD added multiple times to the pollset_set tree */
@@ -301,7 +310,7 @@ void pollset_set_test_dup_fds() {
* | +--> FD2
* +---> FD1
*/
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker* worker;
grpc_millis deadline;
@@ -312,20 +321,21 @@ void pollset_set_test_dup_fds() {
const int num_ps = 1;
const int num_pss = GPR_ARRAY_SIZE(pollset_sets);
- init_test_fds(tfds, num_fds);
+ init_test_fds(&exec_ctx, tfds, num_fds);
init_test_pollsets(&pollset, num_ps);
init_test_pollset_sets(pollset_sets, num_pss);
/* Construct the structure */
- grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd);
- grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[0].fd);
- grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[0].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd);
- grpc_pollset_add_fd(pollset.ps, tfds[1].fd);
- grpc_pollset_add_fd(pollset.ps, tfds[2].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollset.ps, tfds[1].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollset.ps, tfds[2].fd);
- grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollset.ps);
- grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_sets[1].pss, pollset.ps);
+ grpc_pollset_set_add_pollset_set(&exec_ctx, pollset_sets[0].pss,
+ pollset_sets[1].pss);
/* Test. Make all FDs readable and make sure that can be observed by doing a
* grpc_pollset_work on the pollset 'PS' */
@@ -335,25 +345,27 @@ void pollset_set_test_dup_fds() {
deadline = grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE ==
- grpc_pollset_work(pollset.ps, &worker, deadline));
+ grpc_pollset_work(&exec_ctx, pollset.ps, &worker, deadline));
gpr_mu_unlock(pollset.mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
- verify_readable_and_reset(tfds, num_fds);
- grpc_core::ExecCtx::Get()->Flush();
+ verify_readable_and_reset(&exec_ctx, tfds, num_fds);
+ grpc_exec_ctx_flush(&exec_ctx);
/* Tear down */
- grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd);
- grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[0].fd);
- grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd);
-
- grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollset.ps);
- grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
- grpc_core::ExecCtx::Get()->Flush();
-
- cleanup_test_fds(tfds, num_fds);
- cleanup_test_pollsets(&pollset, num_ps);
- cleanup_test_pollset_sets(pollset_sets, num_pss);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[0].pss, tfds[0].fd);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[0].fd);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_sets[1].pss, tfds[1].fd);
+
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_sets[1].pss, pollset.ps);
+ grpc_pollset_set_del_pollset_set(&exec_ctx, pollset_sets[0].pss,
+ pollset_sets[1].pss);
+ grpc_exec_ctx_flush(&exec_ctx);
+
+ cleanup_test_fds(&exec_ctx, tfds, num_fds);
+ cleanup_test_pollsets(&exec_ctx, &pollset, num_ps);
+ cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Pollset_set with an empty pollset */
@@ -371,7 +383,7 @@ void pollset_set_test_empty_pollset() {
* |
* +---> FD2
*/
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker* worker;
grpc_millis deadline;
@@ -382,17 +394,17 @@ void pollset_set_test_empty_pollset() {
const int num_ps = GPR_ARRAY_SIZE(pollsets);
const int num_pss = 1;
- init_test_fds(tfds, num_fds);
+ init_test_fds(&exec_ctx, tfds, num_fds);
init_test_pollsets(pollsets, num_ps);
init_test_pollset_sets(&pollset_set, num_pss);
/* Construct the structure */
- grpc_pollset_set_add_fd(pollset_set.pss, tfds[0].fd);
- grpc_pollset_add_fd(pollsets[1].ps, tfds[1].fd);
- grpc_pollset_add_fd(pollsets[1].ps, tfds[2].fd);
+ grpc_pollset_set_add_fd(&exec_ctx, pollset_set.pss, tfds[0].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[1].fd);
+ grpc_pollset_add_fd(&exec_ctx, pollsets[1].ps, tfds[2].fd);
- grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[0].ps);
- grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[1].ps);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_set.pss, pollsets[0].ps);
+ grpc_pollset_set_add_pollset(&exec_ctx, pollset_set.pss, pollsets[1].ps);
/* Test. Make all FDs readable and make sure that can be observed by doing
* grpc_pollset_work on the empty pollset 'PS0' */
@@ -402,44 +414,45 @@ void pollset_set_test_empty_pollset() {
deadline = grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(2));
GPR_ASSERT(GRPC_ERROR_NONE ==
- grpc_pollset_work(pollsets[0].ps, &worker, deadline));
+ grpc_pollset_work(&exec_ctx, pollsets[0].ps, &worker, deadline));
gpr_mu_unlock(pollsets[0].mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
- verify_readable_and_reset(tfds, num_fds);
- grpc_core::ExecCtx::Get()->Flush();
+ verify_readable_and_reset(&exec_ctx, tfds, num_fds);
+ grpc_exec_ctx_flush(&exec_ctx);
/* Tear down */
- grpc_pollset_set_del_fd(pollset_set.pss, tfds[0].fd);
- grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[0].ps);
- grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[1].ps);
- grpc_core::ExecCtx::Get()->Flush();
-
- cleanup_test_fds(tfds, num_fds);
- cleanup_test_pollsets(pollsets, num_ps);
- cleanup_test_pollset_sets(&pollset_set, num_pss);
+ grpc_pollset_set_del_fd(&exec_ctx, pollset_set.pss, tfds[0].fd);
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_set.pss, pollsets[0].ps);
+ grpc_pollset_set_del_pollset(&exec_ctx, pollset_set.pss, pollsets[1].ps);
+ grpc_exec_ctx_flush(&exec_ctx);
+
+ cleanup_test_fds(&exec_ctx, tfds, num_fds);
+ cleanup_test_pollsets(&exec_ctx, pollsets, num_ps);
+ cleanup_test_pollset_sets(&exec_ctx, &pollset_set, num_pss);
+ grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char** argv) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- const char* poll_strategy = grpc_get_poll_strategy_name();
-
- if (poll_strategy != nullptr &&
- (strcmp(poll_strategy, "epollsig") == 0 ||
- strcmp(poll_strategy, "epoll-threadpool") == 0)) {
- pollset_set_test_basic();
- pollset_set_test_dup_fds();
- pollset_set_test_empty_pollset();
- } else {
- gpr_log(GPR_INFO,
- "Skipping the test. The test is only relevant for 'epoll' "
- "strategy. and the current strategy is: '%s'",
- poll_strategy);
- }
+ const char* poll_strategy = grpc_get_poll_strategy_name();
+
+ if (poll_strategy != nullptr &&
+ (strcmp(poll_strategy, "epollsig") == 0 ||
+ strcmp(poll_strategy, "epoll-threadpool") == 0)) {
+ pollset_set_test_basic();
+ pollset_set_test_dup_fds();
+ pollset_set_test_empty_pollset();
+ } else {
+ gpr_log(GPR_INFO,
+ "Skipping the test. The test is only relevant for 'epoll' "
+ "strategy. and the current strategy is: '%s'",
+ poll_strategy);
}
+
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/resolve_address_posix_test.cc b/test/core/iomgr/resolve_address_posix_test.cc
index e36315333c..836de423bd 100644
--- a/test/core/iomgr/resolve_address_posix_test.cc
+++ b/test/core/iomgr/resolve_address_posix_test.cc
@@ -46,29 +46,29 @@ typedef struct args_struct {
grpc_pollset_set* pollset_set;
} args_struct;
-static void do_nothing(void* arg, grpc_error* error) {}
+static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
-void args_init(args_struct* args) {
+void args_init(grpc_exec_ctx* exec_ctx, args_struct* args) {
gpr_event_init(&args->ev);
args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
- grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset);
args->addrs = nullptr;
}
-void args_finish(args_struct* args) {
+void args_finish(grpc_exec_ctx* exec_ctx, args_struct* args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_resolved_addresses_destroy(args->addrs);
- grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
- grpc_pollset_set_destroy(args->pollset_set);
+ grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset);
+ grpc_pollset_set_destroy(exec_ctx, args->pollset_set);
grpc_closure do_nothing_cb;
GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr,
grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(args->pollset, &do_nothing_cb);
+ grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb);
// exec_ctx needs to be flushed before calling grpc_pollset_destroy()
- grpc_core::ExecCtx::Get()->Flush();
- grpc_pollset_destroy(args->pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ grpc_pollset_destroy(exec_ctx, args->pollset);
gpr_free(args->pollset);
}
@@ -79,24 +79,26 @@ static grpc_millis n_sec_deadline(int seconds) {
static void actually_poll(void* argsp) {
args_struct* args = static_cast<args_struct*>(argsp);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_millis deadline = n_sec_deadline(10);
while (true) {
- grpc_core::ExecCtx exec_ctx;
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
if (done) {
break;
}
- grpc_millis time_left = deadline - grpc_core::ExecCtx::Get()->Now();
+ grpc_millis time_left = deadline - grpc_exec_ctx_now(&exec_ctx);
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRIdPTR, done, time_left);
GPR_ASSERT(time_left >= 0);
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker,
- n_sec_deadline(1)));
+ GRPC_LOG_IF_ERROR("pollset_work",
+ grpc_pollset_work(&exec_ctx, args->pollset, &worker,
+ n_sec_deadline(1)));
gpr_mu_unlock(args->mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
}
gpr_event_set(&args->ev, (void*)1);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void poll_pollset_until_request_done(args_struct* args) {
@@ -105,7 +107,8 @@ static void poll_pollset_until_request_done(args_struct* args) {
gpr_thd_new(&id, "grpc_poll_pollset", actually_poll, args, nullptr);
}
-static void must_succeed(void* argsp, grpc_error* err) {
+static void must_succeed(grpc_exec_ctx* exec_ctx, void* argsp,
+ grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
@@ -113,28 +116,29 @@ static void must_succeed(void* argsp, grpc_error* err) {
gpr_atm_rel_store(&args->done_atm, 1);
}
-static void must_fail(void* argsp, grpc_error* err) {
+static void must_fail(grpc_exec_ctx* exec_ctx, void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err != GRPC_ERROR_NONE);
gpr_atm_rel_store(&args->done_atm, 1);
}
static void test_unix_socket(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
poll_pollset_until_request_done(&args);
grpc_resolve_address(
- "unix:/path/name", nullptr, args.pollset_set,
+ &exec_ctx, "unix:/path/name", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_unix_socket_path_name_too_long(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
const char prefix[] = "unix:/path/name";
size_t path_name_length =
GPR_ARRAY_SIZE(((struct sockaddr_un*)nullptr)->sun_path) + 6;
@@ -146,23 +150,22 @@ static void test_unix_socket_path_name_too_long(void) {
poll_pollset_until_request_done(&args);
grpc_resolve_address(
- path_name, nullptr, args.pollset_set,
+ &exec_ctx, path_name, nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
gpr_free(path_name);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
-
- {
- grpc_core::ExecCtx exec_ctx;
- test_unix_socket();
- test_unix_socket_path_name_too_long();
- }
-
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ test_unix_socket();
+ test_unix_socket_path_name_too_long();
+ grpc_executor_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc
index a0dc484f3e..1c5aa38a95 100644
--- a/test/core/iomgr/resolve_address_test.cc
+++ b/test/core/iomgr/resolve_address_test.cc
@@ -39,32 +39,32 @@ typedef struct args_struct {
grpc_pollset_set* pollset_set;
} args_struct;
-static void do_nothing(void* arg, grpc_error* error) {}
+static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
-void args_init(args_struct* args) {
+void args_init(grpc_exec_ctx* exec_ctx, args_struct* args) {
gpr_event_init(&args->ev);
args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
- grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset);
args->addrs = nullptr;
gpr_atm_rel_store(&args->done_atm, 0);
}
-void args_finish(args_struct* args) {
+void args_finish(grpc_exec_ctx* exec_ctx, args_struct* args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_resolved_addresses_destroy(args->addrs);
- grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
- grpc_pollset_set_destroy(args->pollset_set);
+ grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset);
+ grpc_pollset_set_destroy(exec_ctx, args->pollset_set);
grpc_closure do_nothing_cb;
GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr,
grpc_schedule_on_exec_ctx);
gpr_mu_lock(args->mu);
- grpc_pollset_shutdown(args->pollset, &do_nothing_cb);
+ grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb);
gpr_mu_unlock(args->mu);
// exec_ctx needs to be flushed before calling grpc_pollset_destroy()
- grpc_core::ExecCtx::Get()->Flush();
- grpc_pollset_destroy(args->pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ grpc_pollset_destroy(exec_ctx, args->pollset);
gpr_free(args->pollset);
}
@@ -74,109 +74,119 @@ static grpc_millis n_sec_deadline(int seconds) {
}
static void poll_pollset_until_request_done(args_struct* args) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_millis deadline = n_sec_deadline(10);
while (true) {
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
if (done) {
break;
}
- grpc_millis time_left = deadline - grpc_core::ExecCtx::Get()->Now();
+ grpc_millis time_left = deadline - grpc_exec_ctx_now(&exec_ctx);
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRIdPTR, done, time_left);
GPR_ASSERT(time_left >= 0);
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_work", grpc_pollset_work(args->pollset, &worker,
- n_sec_deadline(1)));
+ GRPC_LOG_IF_ERROR("pollset_work",
+ grpc_pollset_work(&exec_ctx, args->pollset, &worker,
+ n_sec_deadline(1)));
gpr_mu_unlock(args->mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
}
gpr_event_set(&args->ev, (void*)1);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void must_succeed(void* argsp, grpc_error* err) {
+static void must_succeed(grpc_exec_ctx* exec_ctx, void* argsp,
+ grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
GPR_ASSERT(args->addrs->naddrs > 0);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
+ GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
-static void must_fail(void* argsp, grpc_error* err) {
+static void must_fail(grpc_exec_ctx* exec_ctx, void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err != GRPC_ERROR_NONE);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
+ GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
static void test_localhost(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- "localhost:1", nullptr, args.pollset_set,
+ &exec_ctx, "localhost:1", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_default_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- "localhost", "1", args.pollset_set,
+ &exec_ctx, "localhost", "1", args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_non_numeric_default_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- "localhost", "https", args.pollset_set,
+ &exec_ctx, "localhost", "https", args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_missing_default_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- "localhost", nullptr, args.pollset_set,
+ &exec_ctx, "localhost", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_ipv6_with_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- "[2001:db8::1]:1", nullptr, args.pollset_set,
+ &exec_ctx, "[2001:db8::1]:1", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_ipv6_without_port(void) {
@@ -187,16 +197,17 @@ static void test_ipv6_without_port(void) {
};
unsigned i;
for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- kCases[i], "80", args.pollset_set,
+ &exec_ctx, kCases[i], "80", args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
@@ -207,16 +218,17 @@ static void test_invalid_ip_addresses(void) {
};
unsigned i;
for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- kCases[i], nullptr, args.pollset_set,
+ &exec_ctx, kCases[i], nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
@@ -226,35 +238,34 @@ static void test_unparseable_hostports(void) {
};
unsigned i;
for (i = 0; i < sizeof(kCases) / sizeof(*kCases); i++) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
- args_init(&args);
+ args_init(&exec_ctx, &args);
grpc_resolve_address(
- kCases[i], "1", args.pollset_set,
+ &exec_ctx, kCases[i], "1", args.pollset_set,
GRPC_CLOSURE_CREATE(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
poll_pollset_until_request_done(&args);
- args_finish(&args);
+ args_finish(&exec_ctx, &args);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- test_localhost();
- test_default_port();
- test_non_numeric_default_port();
- test_missing_default_port();
- test_ipv6_with_port();
- test_ipv6_without_port();
- test_invalid_ip_addresses();
- test_unparseable_hostports();
- grpc_executor_shutdown();
- }
-
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ test_localhost();
+ test_default_port();
+ test_non_numeric_default_port();
+ test_missing_default_port();
+ test_ipv6_with_port();
+ test_ipv6_without_port();
+ test_invalid_ip_addresses();
+ test_unparseable_hostports();
+ grpc_executor_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/resource_quota_test.cc b/test/core/iomgr/resource_quota_test.cc
index ae26f72701..6851702e67 100644
--- a/test/core/iomgr/resource_quota_test.cc
+++ b/test/core/iomgr/resource_quota_test.cc
@@ -27,7 +27,7 @@
gpr_mu g_mu;
gpr_cv g_cv;
-static void inc_int_cb(void* a, grpc_error* error) {
+static void inc_int_cb(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) {
gpr_mu_lock(&g_mu);
++*(int*)a;
gpr_cv_signal(&g_cv);
@@ -43,7 +43,7 @@ static void assert_counter_becomes(int* ctr, int value) {
gpr_mu_unlock(&g_mu);
}
-static void set_event_cb(void* a, grpc_error* error) {
+static void set_event_cb(grpc_exec_ctx* exec_ctx, void* a, grpc_error* error) {
gpr_event_set((gpr_event*)a, (void*)1);
}
grpc_closure* set_event(gpr_event* ev) {
@@ -56,12 +56,13 @@ typedef struct {
grpc_closure* then;
} reclaimer_args;
-static void reclaimer_cb(void* args, grpc_error* error) {
+static void reclaimer_cb(grpc_exec_ctx* exec_ctx, void* args,
+ grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
reclaimer_args* a = static_cast<reclaimer_args*>(args);
- grpc_resource_user_free(a->resource_user, a->size);
- grpc_resource_user_finish_reclamation(a->resource_user);
- GRPC_CLOSURE_RUN(a->then, GRPC_ERROR_NONE);
+ grpc_resource_user_free(exec_ctx, a->resource_user, a->size);
+ grpc_resource_user_finish_reclamation(exec_ctx, a->resource_user);
+ GRPC_CLOSURE_RUN(exec_ctx, a->then, GRPC_ERROR_NONE);
gpr_free(a);
}
@@ -74,9 +75,10 @@ grpc_closure* make_reclaimer(grpc_resource_user* resource_user, size_t size,
return GRPC_CLOSURE_CREATE(reclaimer_cb, a, grpc_schedule_on_exec_ctx);
}
-static void unused_reclaimer_cb(void* arg, grpc_error* error) {
+static void unused_reclaimer_cb(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
- GRPC_CLOSURE_RUN(static_cast<grpc_closure*>(arg), GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, static_cast<grpc_closure*>(arg), GRPC_ERROR_NONE);
}
grpc_closure* make_unused_reclaimer(grpc_closure* then) {
return GRPC_CLOSURE_CREATE(unused_reclaimer_cb, then,
@@ -84,8 +86,9 @@ grpc_closure* make_unused_reclaimer(grpc_closure* then) {
}
static void destroy_user(grpc_resource_user* usr) {
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_unref(usr);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op(void) {
@@ -117,12 +120,14 @@ static void test_instant_alloc_then_free(void) {
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user* usr = grpc_resource_user_create(q, "usr");
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -135,9 +140,10 @@ static void test_instant_alloc_free_pair(void) {
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user* usr = grpc_resource_user_create(q, "usr");
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, NULL);
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr);
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -152,15 +158,16 @@ static void test_simple_async_alloc(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -175,9 +182,9 @@ static void test_async_alloc_blocked_by_size(void) {
gpr_event ev;
gpr_event_init(&ev);
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(
&ev, grpc_timeout_milliseconds_to_deadline(100)) == nullptr);
}
@@ -186,8 +193,9 @@ static void test_async_alloc_blocked_by_size(void) {
nullptr);
;
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -202,30 +210,32 @@ static void test_scavenge(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr1, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr1, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr1, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr2, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr2, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr2, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr1);
@@ -241,32 +251,33 @@ static void test_scavenge_blocked(void) {
gpr_event ev;
{
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr1, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr2, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(
&ev, grpc_timeout_milliseconds_to_deadline(100)) == nullptr);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr1, 1024);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr1, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr2, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr2, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr1);
@@ -282,9 +293,9 @@ static void test_blocked_until_scheduled_reclaim(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
@@ -292,16 +303,18 @@ static void test_blocked_until_scheduled_reclaim(void) {
gpr_event reclaim_done;
gpr_event_init(&reclaim_done);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_reclaimer(usr, 1024, set_event(&reclaim_done)));
+ &exec_ctx, usr, false,
+ make_reclaimer(usr, 1024, set_event(&reclaim_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaim_done,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
@@ -309,8 +322,9 @@ static void test_blocked_until_scheduled_reclaim(void) {
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -326,9 +340,9 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr1, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr1, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
@@ -336,16 +350,18 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
gpr_event reclaim_done;
gpr_event_init(&reclaim_done);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr1, false, make_reclaimer(usr1, 1024, set_event(&reclaim_done)));
+ &exec_ctx, usr1, false,
+ make_reclaimer(usr1, 1024, set_event(&reclaim_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr2, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr2, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaim_done,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
@@ -353,8 +369,9 @@ static void test_blocked_until_scheduled_reclaim_and_scavenge(void) {
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr2, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr2, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr1);
@@ -370,9 +387,9 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
@@ -380,16 +397,18 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
gpr_event reclaim_done;
gpr_event_init(&reclaim_done);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, true, make_reclaimer(usr, 1024, set_event(&reclaim_done)));
+ &exec_ctx, usr, true,
+ make_reclaimer(usr, 1024, set_event(&reclaim_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaim_done,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
@@ -397,8 +416,9 @@ static void test_blocked_until_scheduled_destructive_reclaim(void) {
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -415,12 +435,13 @@ static void test_unused_reclaim_is_cancelled(void) {
gpr_event destructive_done;
gpr_event_init(&destructive_done);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_unused_reclaimer(set_event(&benign_done)));
+ &exec_ctx, usr, false, make_unused_reclaimer(set_event(&benign_done)));
grpc_resource_user_post_reclaimer(
- usr, true, make_unused_reclaimer(set_event(&destructive_done)));
- grpc_core::ExecCtx::Get()->Flush();
+ &exec_ctx, usr, true,
+ make_unused_reclaimer(set_event(&destructive_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&benign_done,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
@@ -449,20 +470,22 @@ static void test_benign_reclaim_is_preferred(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_reclaimer(usr, 1024, set_event(&benign_done)));
+ &exec_ctx, usr, false,
+ make_reclaimer(usr, 1024, set_event(&benign_done)));
grpc_resource_user_post_reclaimer(
- usr, true, make_unused_reclaimer(set_event(&destructive_done)));
- grpc_core::ExecCtx::Get()->Flush();
+ &exec_ctx, usr, true,
+ make_unused_reclaimer(set_event(&destructive_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&benign_done,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
@@ -473,9 +496,9 @@ static void test_benign_reclaim_is_preferred(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&benign_done,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&destructive_done,
@@ -485,8 +508,9 @@ static void test_benign_reclaim_is_preferred(void) {
nullptr);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -509,20 +533,22 @@ static void test_multiple_reclaims_can_be_triggered(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&ev, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
;
}
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_reclaimer(usr, 512, set_event(&benign_done)));
+ &exec_ctx, usr, false,
+ make_reclaimer(usr, 512, set_event(&benign_done)));
grpc_resource_user_post_reclaimer(
- usr, true, make_reclaimer(usr, 512, set_event(&destructive_done)));
- grpc_core::ExecCtx::Get()->Flush();
+ &exec_ctx, usr, true,
+ make_reclaimer(usr, 512, set_event(&destructive_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&benign_done,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
@@ -533,9 +559,9 @@ static void test_multiple_reclaims_can_be_triggered(void) {
{
gpr_event ev;
gpr_event_init(&ev);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&ev));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&ev));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&benign_done,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&destructive_done,
@@ -545,8 +571,9 @@ static void test_multiple_reclaims_can_be_triggered(void) {
;
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
destroy_user(usr);
@@ -564,17 +591,20 @@ static void test_resource_user_stays_allocated_until_memory_released(void) {
grpc_resource_quota_resize(q, 1024 * 1024);
grpc_resource_user* usr = grpc_resource_user_create(q, "usr");
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_quota_unref(q);
- grpc_resource_user_unref(usr);
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
@@ -594,10 +624,11 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
gpr_event reclaimer_cancelled;
gpr_event_init(&reclaimer_cancelled);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_unused_reclaimer(set_event(&reclaimer_cancelled)));
- grpc_core::ExecCtx::Get()->Flush();
+ &exec_ctx, usr, false,
+ make_unused_reclaimer(set_event(&reclaimer_cancelled)));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
@@ -605,27 +636,27 @@ test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released(
{
gpr_event allocated;
gpr_event_init(&allocated);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&allocated));
- grpc_core::ExecCtx::Get()->Flush();
- GPR_ASSERT(gpr_event_wait(&allocated,
- grpc_timeout_seconds_to_deadline(5)) != NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated));
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(gpr_event_wait(&allocated, grpc_timeout_seconds_to_deadline(
+ 5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_unref(usr);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaimer_cancelled,
grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
@@ -643,9 +674,9 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
{
gpr_event allocated;
gpr_event_init(&allocated);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&allocated));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&allocated,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
}
@@ -653,10 +684,11 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
gpr_event reclaimer_done;
gpr_event_init(&reclaimer_done);
{
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_post_reclaimer(
- usr, false, make_reclaimer(usr, 1024, set_event(&reclaimer_done)));
- grpc_core::ExecCtx::Get()->Flush();
+ &exec_ctx, usr, false,
+ make_reclaimer(usr, 1024, set_event(&reclaimer_done)));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(gpr_event_wait(&reclaimer_done,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
@@ -664,19 +696,20 @@ static void test_reclaimers_can_be_posted_repeatedly(void) {
{
gpr_event allocated;
gpr_event_init(&allocated);
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc(usr, 1024, set_event(&allocated));
- grpc_core::ExecCtx::Get()->Flush();
- GPR_ASSERT(gpr_event_wait(&allocated,
- grpc_timeout_seconds_to_deadline(5)) != NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc(&exec_ctx, usr, 1024, set_event(&allocated));
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(gpr_event_wait(&allocated, grpc_timeout_seconds_to_deadline(
+ 5)) != nullptr);
GPR_ASSERT(gpr_event_wait(&reclaimer_done,
grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_free(usr, 1024);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_free(&exec_ctx, usr, 1024);
+ grpc_exec_ctx_finish(&exec_ctx);
}
destroy_user(usr);
grpc_resource_quota_unref(q);
@@ -699,15 +732,16 @@ static void test_one_slice(void) {
{
const int start_allocs = num_allocs;
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
assert_counter_becomes(&num_allocs, start_allocs + 1);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_slice_buffer_destroy_internal(&buffer);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
}
destroy_user(usr);
grpc_resource_quota_unref(q);
@@ -731,21 +765,23 @@ static void test_one_slice_deleted_late(void) {
{
const int start_allocs = num_allocs;
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
assert_counter_becomes(&num_allocs, start_allocs + 1);
}
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_unref(usr);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
{
- grpc_core::ExecCtx exec_ctx;
- grpc_slice_buffer_destroy_internal(&buffer);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
@@ -773,9 +809,9 @@ static void test_negative_rq_free_pool(void) {
{
const int start_allocs = num_allocs;
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_alloc_slices(&alloc, 1024, 1, &buffer);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_alloc_slices(&exec_ctx, &alloc, 1024, 1, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
assert_counter_becomes(&num_allocs, start_allocs + 1);
}
@@ -786,14 +822,16 @@ static void test_negative_rq_free_pool(void) {
GPR_ASSERT(grpc_resource_quota_get_memory_pressure(q) > 1 - eps);
{
- grpc_core::ExecCtx exec_ctx;
- grpc_resource_user_unref(usr);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resource_user_unref(&exec_ctx, usr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_resource_quota_unref(q);
{
- grpc_core::ExecCtx exec_ctx;
- grpc_slice_buffer_destroy_internal(&buffer);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
}
}
diff --git a/test/core/iomgr/tcp_client_posix_test.cc b/test/core/iomgr/tcp_client_posix_test.cc
index 40a050ed9f..9fb1a2d770 100644
--- a/test/core/iomgr/tcp_client_posix_test.cc
+++ b/test/core/iomgr/tcp_client_posix_test.cc
@@ -53,24 +53,26 @@ static grpc_millis test_deadline(void) {
static void finish_connection() {
gpr_mu_lock(g_mu);
g_connections_complete++;
- grpc_core::ExecCtx exec_ctx;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
-
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(&exec_ctx, g_pollset, nullptr)));
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_unlock(g_mu);
}
-static void must_succeed(void* arg, grpc_error* error) {
+static void must_succeed(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
GPR_ASSERT(g_connecting != nullptr);
GPR_ASSERT(error == GRPC_ERROR_NONE);
- grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "must_succeed called"));
- grpc_endpoint_destroy(g_connecting);
+ grpc_endpoint_shutdown(
+ exec_ctx, g_connecting,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("must_succeed called"));
+ grpc_endpoint_destroy(exec_ctx, g_connecting);
g_connecting = nullptr;
finish_connection();
}
-static void must_fail(void* arg, grpc_error* error) {
+static void must_fail(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
GPR_ASSERT(g_connecting == nullptr);
GPR_ASSERT(error != GRPC_ERROR_NONE);
finish_connection();
@@ -83,7 +85,7 @@ void test_succeeds(void) {
int r;
int connections_complete_before;
grpc_closure done;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_succeeds");
@@ -106,8 +108,8 @@ void test_succeeds(void) {
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr,
(socklen_t*)&resolved_addr.len) == 0);
GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
- grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr,
- &resolved_addr, GRPC_MILLIS_INF_FUTURE);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set,
+ nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
/* await the connection */
do {
@@ -123,15 +125,17 @@ void test_succeeds(void) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(g_pollset, &worker,
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
grpc_timespec_to_millis_round_up(
grpc_timeout_seconds_to_deadline(5)))));
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
+
+ grpc_exec_ctx_finish(&exec_ctx);
}
void test_fails(void) {
@@ -139,7 +143,7 @@ void test_fails(void) {
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
int connections_complete_before;
grpc_closure done;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_fails");
@@ -153,8 +157,8 @@ void test_fails(void) {
/* connect to a broken address */
GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
- grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, nullptr,
- &resolved_addr, GRPC_MILLIS_INF_FUTURE);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set,
+ nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
gpr_mu_lock(g_mu);
@@ -162,7 +166,7 @@ void test_fails(void) {
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker* worker = nullptr;
grpc_millis polling_deadline = test_deadline();
- switch (grpc_timer_check(&polling_deadline)) {
+ switch (grpc_timer_check(&exec_ctx, &polling_deadline)) {
case GRPC_TIMERS_FIRED:
break;
case GRPC_TIMERS_NOT_CHECKED:
@@ -170,43 +174,42 @@ void test_fails(void) {
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, polling_deadline)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ polling_deadline)));
break;
}
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
-
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset_set = grpc_pollset_set_create();
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
- grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
-
- test_succeeds();
- gpr_log(GPR_ERROR, "End of first test");
- test_fails();
- grpc_pollset_set_destroy(g_pollset_set);
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- }
-
+ g_pollset_set = grpc_pollset_set_create();
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
+ grpc_pollset_set_add_pollset(&exec_ctx, g_pollset_set, g_pollset);
+ grpc_exec_ctx_finish(&exec_ctx);
+ test_succeeds();
+ gpr_log(GPR_ERROR, "End of first test");
+ test_fails();
+ grpc_pollset_set_destroy(&exec_ctx, g_pollset_set);
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);
return 0;
diff --git a/test/core/iomgr/tcp_client_uv_test.cc b/test/core/iomgr/tcp_client_uv_test.cc
index 0c6250ed7f..101d7bf6b5 100644
--- a/test/core/iomgr/tcp_client_uv_test.cc
+++ b/test/core/iomgr/tcp_client_uv_test.cc
@@ -46,28 +46,30 @@ static grpc_millis test_deadline(void) {
return grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
}
-static void finish_connection() {
+static void finish_connection(grpc_exec_ctx* exec_ctx) {
gpr_mu_lock(g_mu);
g_connections_complete++;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
-static void must_succeed(void* arg, grpc_error* error) {
+static void must_succeed(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
GPR_ASSERT(g_connecting != NULL);
GPR_ASSERT(error == GRPC_ERROR_NONE);
- grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "must_succeed called"));
- grpc_endpoint_destroy(g_connecting);
+ grpc_endpoint_shutdown(
+ exec_ctx, g_connecting,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("must_succeed called"));
+ grpc_endpoint_destroy(exec_ctx, g_connecting);
g_connecting = NULL;
- finish_connection();
+ finish_connection(exec_ctx);
}
-static void must_fail(void* arg, grpc_error* error) {
+static void must_fail(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
GPR_ASSERT(g_connecting == NULL);
GPR_ASSERT(error != GRPC_ERROR_NONE);
- finish_connection();
+ finish_connection(exec_ctx);
}
static void close_cb(uv_handle_t* handle) { gpr_free(handle); }
@@ -87,7 +89,7 @@ void test_succeeds(void) {
uv_tcp_t* svr_handle = static_cast<uv_tcp_t*>(gpr_malloc(sizeof(uv_tcp_t)));
int connections_complete_before;
grpc_closure done;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_succeeds");
@@ -108,8 +110,8 @@ void test_succeeds(void) {
GPR_ASSERT(uv_tcp_getsockname(svr_handle, (struct sockaddr*)addr,
(int*)&resolved_addr.len) == 0);
GRPC_CLOSURE_INIT(&done, must_succeed, NULL, grpc_schedule_on_exec_ctx);
- grpc_tcp_client_connect(&done, &g_connecting, NULL, NULL, &resolved_addr,
- GRPC_MILLIS_INF_FUTURE);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL,
+ &resolved_addr, GRPC_MILLIS_INF_FUTURE);
gpr_mu_lock(g_mu);
@@ -117,11 +119,11 @@ void test_succeeds(void) {
grpc_pollset_worker* worker = NULL;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(g_pollset, &worker,
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
grpc_timespec_to_millis_round_up(
grpc_timeout_seconds_to_deadline(5)))));
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
@@ -129,6 +131,8 @@ void test_succeeds(void) {
uv_close((uv_handle_t*)svr_handle, close_cb);
gpr_mu_unlock(g_mu);
+
+ grpc_exec_ctx_finish(&exec_ctx);
}
void test_fails(void) {
@@ -136,7 +140,7 @@ void test_fails(void) {
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
int connections_complete_before;
grpc_closure done;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_fails");
@@ -150,8 +154,8 @@ void test_fails(void) {
/* connect to a broken address */
GRPC_CLOSURE_INIT(&done, must_fail, NULL, grpc_schedule_on_exec_ctx);
- grpc_tcp_client_connect(&done, &g_connecting, NULL, NULL, &resolved_addr,
- GRPC_MILLIS_INF_FUTURE);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, NULL, NULL,
+ &resolved_addr, GRPC_MILLIS_INF_FUTURE);
gpr_mu_lock(g_mu);
@@ -160,7 +164,7 @@ void test_fails(void) {
grpc_pollset_worker* worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_millis polling_deadline = test_deadline();
- switch (grpc_timer_check(&polling_deadline)) {
+ switch (grpc_timer_check(&exec_ctx, &polling_deadline)) {
case GRPC_TIMERS_FIRED:
break;
case GRPC_TIMERS_NOT_CHECKED:
@@ -168,37 +172,39 @@ void test_fails(void) {
/* fall through */
case GRPC_TIMERS_CHECKED_AND_EMPTY:
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(g_pollset, &worker, polling_deadline)));
+ "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ polling_deadline)));
break;
}
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size()));
grpc_pollset_init(g_pollset, &g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
test_succeeds();
gpr_log(GPR_ERROR, "End of first test");
test_fails();
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
-
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);
return 0;
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index f4acba8302..7986dc2b19 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -131,7 +131,8 @@ static size_t count_slices(grpc_slice* slices, size_t nslices,
return num_bytes;
}
-static void read_cb(void* user_data, grpc_error* error) {
+static void read_cb(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_error* error) {
struct read_socket_state* state = (struct read_socket_state*)user_data;
size_t read_bytes;
int current_data;
@@ -146,11 +147,11 @@ static void read_cb(void* user_data, grpc_error* error) {
gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
} else {
- grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
+ grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb);
gpr_mu_unlock(g_mu);
}
}
@@ -163,7 +164,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
size_t written_bytes;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
num_bytes, slice_size);
@@ -174,8 +175,9 @@ static void read_test(size_t num_bytes, size_t slice_size) {
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -186,22 +188,24 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Write to a socket until it fills up, then read from it using the grpc_tcp
@@ -213,7 +217,7 @@ static void large_read_test(size_t slice_size) {
ssize_t written_bytes;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
@@ -224,8 +228,9 @@ static void large_read_test(size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"),
+ &args, "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -236,22 +241,24 @@ static void large_read_test(size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
struct write_socket_state {
@@ -282,15 +289,16 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
return slices;
}
-static void write_done(void* user_data /* write_socket_state */,
+static void write_done(grpc_exec_ctx* exec_ctx,
+ void* user_data /* write_socket_state */,
grpc_error* error) {
struct write_socket_state* state = (struct write_socket_state*)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(g_mu);
gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
@@ -301,7 +309,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
int flags;
int current = 0;
int i;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
@@ -311,11 +319,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_mu_lock(g_mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(g_pollset, &worker,
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(10)))));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
do {
bytes_read =
read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
@@ -348,7 +356,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_closure write_done_closure;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
@@ -360,8 +368,9 @@ static void write_test(size_t num_bytes, size_t slice_size) {
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args,
+ "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
state.ep = ep;
state.write_done = 0;
@@ -373,7 +382,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
- grpc_endpoint_write(ep, &outgoing, &write_done_closure);
+ grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(g_mu);
for (;;) {
@@ -382,23 +391,25 @@ static void write_test(size_t num_bytes, size_t slice_size) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&outgoing);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &outgoing);
+ grpc_endpoint_destroy(&exec_ctx, ep);
gpr_free(slices);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-void on_fd_released(void* arg, grpc_error* errors) {
+void on_fd_released(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* errors) {
int* done = (int*)arg;
*done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
}
/* Do a read_test, then release fd and try to read/write again. Verify that
@@ -411,7 +422,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
int fd;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure fd_released_cb;
int fd_released_done = 0;
GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
@@ -428,9 +439,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -441,35 +453,38 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
state.read_bytes, state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
while (!fd_released_done) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
}
gpr_mu_unlock(g_mu);
GPR_ASSERT(fd_released_done == 1);
GPR_ASSERT(fd == sv[1]);
+ grpc_exec_ctx_finish(&exec_ctx);
written_bytes = fill_socket_partial(sv[0], num_bytes);
drain_socket_blocking(fd, written_bytes, written_bytes);
@@ -507,7 +522,7 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
size_t slice_size) {
int sv[2];
grpc_endpoint_test_fixture f;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
create_sockets(sv);
grpc_resource_quota* resource_quota =
@@ -517,13 +532,15 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- f.client_ep =
- grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test");
- f.server_ep =
- grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test");
- grpc_resource_quota_unref_internal(resource_quota);
- grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
+ f.client_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test");
+ f.server_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test");
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);
+
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
@@ -532,26 +549,24 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
};
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy((grpc_pollset*)p);
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p);
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
- grpc_pollset_init(g_pollset, &g_mu);
- grpc_endpoint_tests(configs[0], g_pollset, g_mu);
- run_tests();
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
-
- grpc_core::ExecCtx::Get()->Flush();
- }
+ g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(g_pollset, &g_mu);
+ grpc_endpoint_tests(configs[0], g_pollset, g_mu);
+ run_tests();
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);
diff --git a/test/core/iomgr/tcp_server_posix_test.cc b/test/core/iomgr/tcp_server_posix_test.cc
index 3c9ca2109e..48d8d425a5 100644
--- a/test/core/iomgr/tcp_server_posix_test.cc
+++ b/test/core/iomgr/tcp_server_posix_test.cc
@@ -110,7 +110,8 @@ static void on_connect_result_set(on_connect_result* result,
result->server, acceptor->port_index, acceptor->fd_index);
}
-static void server_weak_ref_shutdown(void* arg, grpc_error* error) {
+static void server_weak_ref_shutdown(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
weak_ref->server = nullptr;
}
@@ -144,11 +145,12 @@ static void test_addr_init_str(test_addr* addr) {
}
}
-static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
+static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
+ grpc_pollset* pollset,
grpc_tcp_server_acceptor* acceptor) {
- grpc_endpoint_shutdown(tcp,
+ grpc_endpoint_shutdown(exec_ctx, tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
- grpc_endpoint_destroy(tcp);
+ grpc_endpoint_destroy(exec_ctx, tcp);
on_connect_result temp_result;
on_connect_result_set(&temp_result, acceptor);
@@ -157,33 +159,38 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
gpr_mu_lock(g_mu);
g_result = temp_result;
g_nconnects++;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
static void test_no_op(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
- grpc_tcp_server_unref(s);
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s));
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s));
LOG_TEST("test_no_op_with_start");
- grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_start(&exec_ctx, s, nullptr, 0, on_connect, nullptr);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s));
LOG_TEST("test_no_op_with_port");
memset(&resolved_addr, 0, sizeof(resolved_addr));
@@ -194,15 +201,17 @@ static void test_no_op_with_port(void) {
GRPC_ERROR_NONE &&
port > 0);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port_and_start(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, nullptr, nullptr, &s));
LOG_TEST("test_no_op_with_port_and_start");
int port = -1;
@@ -213,12 +222,13 @@ static void test_no_op_with_port_and_start(void) {
GRPC_ERROR_NONE &&
port > 0);
- grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
+ grpc_tcp_server_start(&exec_ctx, s, nullptr, 0, on_connect, nullptr);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static grpc_error* tcp_connect(const test_addr* remote,
+static grpc_error* tcp_connect(grpc_exec_ctx* exec_ctx, const test_addr* remote,
on_connect_result* result) {
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
@@ -244,17 +254,17 @@ static grpc_error* tcp_connect(const test_addr* remote,
}
gpr_log(GPR_DEBUG, "wait");
while (g_nconnects == nconnects_before &&
- deadline > grpc_core::ExecCtx::Get()->Now()) {
+ deadline > grpc_exec_ctx_now(exec_ctx)) {
grpc_pollset_worker* worker = nullptr;
grpc_error* err;
- if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
+ if ((err = grpc_pollset_work(exec_ctx, g_pollset, &worker, deadline)) !=
GRPC_ERROR_NONE) {
gpr_mu_unlock(g_mu);
close(clifd);
return err;
}
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_log(GPR_DEBUG, "wait done");
@@ -269,7 +279,7 @@ static grpc_error* tcp_connect(const test_addr* remote,
gpr_mu_unlock(g_mu);
gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index,
result->fd_index, result->server_fd);
- grpc_tcp_server_unref(result->server);
+ grpc_tcp_server_unref(exec_ctx, result->server);
return GRPC_ERROR_NONE;
}
@@ -282,7 +292,7 @@ static grpc_error* tcp_connect(const test_addr* remote,
static void test_connect(size_t num_connects,
const grpc_channel_args* channel_args,
test_addrs* dst_addrs, bool test_dst_addrs) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
grpc_resolved_address resolved_addr1;
struct sockaddr_storage* const addr =
@@ -297,7 +307,7 @@ static void test_connect(size_t num_connects,
grpc_tcp_server* s;
const unsigned num_ports = 2;
GPR_ASSERT(GRPC_ERROR_NONE ==
- grpc_tcp_server_create(nullptr, channel_args, &s));
+ grpc_tcp_server_create(&exec_ctx, nullptr, channel_args, &s));
unsigned port_num;
server_weak_ref weak_ref;
server_weak_ref_init(&weak_ref);
@@ -342,7 +352,7 @@ static void test_connect(size_t num_connects,
svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
GPR_ASSERT(svr1_fd_count >= 1);
- grpc_tcp_server_start(s, &g_pollset, 1, on_connect, nullptr);
+ grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, nullptr);
if (dst_addrs != nullptr) {
int ports[] = {svr_port, svr1_port};
@@ -362,7 +372,7 @@ static void test_connect(size_t num_connects,
test_addr_init_str(&dst);
++num_tested;
on_connect_result_init(&result);
- if ((err = tcp_connect(&dst, &result)) == GRPC_ERROR_NONE &&
+ if ((err = tcp_connect(&exec_ctx, &dst, &result)) == GRPC_ERROR_NONE &&
result.server_fd >= 0 && result.server == s) {
continue;
}
@@ -393,8 +403,8 @@ static void test_connect(size_t num_connects,
for (connect_num = 0; connect_num < num_connects; ++connect_num) {
on_connect_result result;
on_connect_result_init(&result);
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("tcp_connect",
+ tcp_connect(&exec_ctx, &dst, &result)));
GPR_ASSERT(result.server_fd == fd);
GPR_ASSERT(result.port_index == port_num);
GPR_ASSERT(result.fd_index == fd_num);
@@ -410,19 +420,21 @@ static void test_connect(size_t num_connects,
GPR_ASSERT(weak_ref.server != nullptr);
GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0);
- grpc_tcp_server_unref(s);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
/* Weak ref lost. */
GPR_ASSERT(weak_ref.server == nullptr);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_arg chan_args[1];
chan_args[0].type = GRPC_ARG_INTEGER;
chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
@@ -435,61 +447,58 @@ int main(int argc, char** argv) {
static_cast<test_addrs*>(gpr_zalloc(sizeof(*dst_addrs)));
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
-
- test_no_op();
- test_no_op_with_start();
- test_no_op_with_port();
- test_no_op_with_port_and_start();
-
- if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
- gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno));
- return EXIT_FAILURE;
- }
- dst_addrs->naddrs = 0;
- for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
- ifa_it = ifa_it->ifa_next) {
- if (ifa_it->ifa_addr == nullptr) {
- continue;
- } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
- dst_addrs->addrs[dst_addrs->naddrs].addr.len =
- sizeof(struct sockaddr_in);
- } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
- dst_addrs->addrs[dst_addrs->naddrs].addr.len =
- sizeof(struct sockaddr_in6);
- } else {
- continue;
- }
- memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
- dst_addrs->addrs[dst_addrs->naddrs].addr.len);
- GPR_ASSERT(
- grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
- test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
- ++dst_addrs->naddrs;
- }
- freeifaddrs(ifa);
- ifa = nullptr;
-
- /* Connect to same addresses as listeners. */
- test_connect(1, nullptr, nullptr, false);
- test_connect(10, nullptr, nullptr, false);
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
- /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
- "::" listener. */
- test_connect(1, nullptr, dst_addrs, true);
+ test_no_op();
+ test_no_op_with_start();
+ test_no_op_with_port();
+ test_no_op_with_port_and_start();
- /* Test connect(2) with dst_addrs. */
- test_connect(1, &channel_args, dst_addrs, false);
- /* Test connect(2) with dst_addrs. */
- test_connect(10, &channel_args, dst_addrs, false);
-
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
+ if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
+ gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno));
+ return EXIT_FAILURE;
+ }
+ dst_addrs->naddrs = 0;
+ for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
+ ifa_it = ifa_it->ifa_next) {
+ if (ifa_it->ifa_addr == nullptr) {
+ continue;
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
+ dst_addrs->addrs[dst_addrs->naddrs].addr.len = sizeof(struct sockaddr_in);
+ } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
+ dst_addrs->addrs[dst_addrs->naddrs].addr.len =
+ sizeof(struct sockaddr_in6);
+ } else {
+ continue;
+ }
+ memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
+ dst_addrs->addrs[dst_addrs->naddrs].addr.len);
+ GPR_ASSERT(
+ grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
+ test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
+ ++dst_addrs->naddrs;
}
+ freeifaddrs(ifa);
+ ifa = nullptr;
+
+ /* Connect to same addresses as listeners. */
+ test_connect(1, nullptr, nullptr, false);
+ test_connect(10, nullptr, nullptr, false);
+
+ /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
+ "::" listener. */
+ test_connect(1, nullptr, dst_addrs, true);
+
+ /* Test connect(2) with dst_addrs. */
+ test_connect(1, &channel_args, dst_addrs, false);
+ /* Test connect(2) with dst_addrs. */
+ test_connect(10, &channel_args, dst_addrs, false);
+
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(dst_addrs);
gpr_free(g_pollset);
diff --git a/test/core/iomgr/tcp_server_uv_test.cc b/test/core/iomgr/tcp_server_uv_test.cc
index 35d62b51b7..dd047a0498 100644
--- a/test/core/iomgr/tcp_server_uv_test.cc
+++ b/test/core/iomgr/tcp_server_uv_test.cc
@@ -74,7 +74,8 @@ static void on_connect_result_set(on_connect_result* result,
result->fd_index = acceptor->fd_index;
}
-static void server_weak_ref_shutdown(void* arg, grpc_error* error) {
+static void server_weak_ref_shutdown(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
weak_ref->server = NULL;
}
@@ -96,11 +97,12 @@ static void server_weak_ref_set(server_weak_ref* weak_ref,
weak_ref->server = server;
}
-static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
+static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
+ grpc_pollset* pollset,
grpc_tcp_server_acceptor* acceptor) {
- grpc_endpoint_shutdown(tcp,
+ grpc_endpoint_shutdown(exec_ctx, tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
- grpc_endpoint_destroy(tcp);
+ grpc_endpoint_destroy(exec_ctx, tcp);
on_connect_result temp_result;
on_connect_result_set(&temp_result, acceptor);
@@ -109,33 +111,38 @@ static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
gpr_mu_lock(g_mu);
g_result = temp_result;
g_nconnects++;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick",
+ grpc_pollset_kick(exec_ctx, g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
static void test_no_op(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
- grpc_tcp_server_unref(s);
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
LOG_TEST("test_no_op_with_start");
- grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
LOG_TEST("test_no_op_with_port");
memset(&resolved_addr, 0, sizeof(resolved_addr));
@@ -146,15 +153,17 @@ static void test_no_op_with_port(void) {
GRPC_ERROR_NONE &&
port > 0);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port_and_start(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
LOG_TEST("test_no_op_with_port_and_start");
int port;
@@ -165,9 +174,10 @@ static void test_no_op_with_port_and_start(void) {
GRPC_ERROR_NONE &&
port > 0);
- grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void connect_cb(uv_connect_t* req, int status) {
@@ -177,8 +187,8 @@ static void connect_cb(uv_connect_t* req, int status) {
static void close_cb(uv_handle_t* handle) { gpr_free(handle); }
-static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len,
- on_connect_result* result) {
+static void tcp_connect(grpc_exec_ctx* exec_ctx, const struct sockaddr* remote,
+ socklen_t remote_len, on_connect_result* result) {
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10);
uv_tcp_t* client_handle =
static_cast<uv_tcp_t*>(gpr_malloc(sizeof(uv_tcp_t)));
@@ -198,10 +208,10 @@ static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len,
grpc_pollset_worker* worker = NULL;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(g_pollset, &worker,
+ grpc_pollset_work(exec_ctx, g_pollset, &worker,
grpc_timespec_to_millis_round_up(deadline))));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_log(GPR_DEBUG, "wait done");
@@ -214,7 +224,7 @@ static void tcp_connect(const struct sockaddr* remote, socklen_t remote_len,
/* Tests a tcp server with multiple ports. */
static void test_connect(unsigned n) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
grpc_resolved_address resolved_addr1;
struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr;
@@ -223,7 +233,8 @@ static void test_connect(unsigned n) {
int svr_port;
int svr1_port;
grpc_tcp_server* s;
- GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
+ GPR_ASSERT(GRPC_ERROR_NONE ==
+ grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s));
unsigned i;
server_weak_ref weak_ref;
server_weak_ref_init(&weak_ref);
@@ -246,45 +257,48 @@ static void test_connect(unsigned n) {
GRPC_ERROR_NONE &&
svr_port == svr1_port);
- grpc_tcp_server_start(s, &g_pollset, 1, on_connect, NULL);
+ grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL);
GPR_ASSERT(uv_ip6_addr("::", svr_port, (struct sockaddr_in6*)addr1) == 0);
for (i = 0; i < n; i++) {
on_connect_result result;
on_connect_result_init(&result);
- tcp_connect((struct sockaddr*)addr, (socklen_t)resolved_addr.len, &result);
+ tcp_connect(&exec_ctx, (struct sockaddr*)addr, (socklen_t)resolved_addr.len,
+ &result);
GPR_ASSERT(result.port_index == 0);
GPR_ASSERT(result.server == s);
if (weak_ref.server == NULL) {
server_weak_ref_set(&weak_ref, result.server);
}
- grpc_tcp_server_unref(result.server);
+ grpc_tcp_server_unref(&exec_ctx, result.server);
on_connect_result_init(&result);
- tcp_connect((struct sockaddr*)addr1, (socklen_t)resolved_addr1.len,
- &result);
+ tcp_connect(&exec_ctx, (struct sockaddr*)addr1,
+ (socklen_t)resolved_addr1.len, &result);
GPR_ASSERT(result.port_index == 1);
GPR_ASSERT(result.server == s);
- grpc_tcp_server_unref(result.server);
+ grpc_tcp_server_unref(&exec_ctx, result.server);
}
/* Weak ref to server valid until final unref. */
GPR_ASSERT(weak_ref.server != NULL);
- grpc_tcp_server_unref(s);
+ grpc_tcp_server_unref(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
/* Weak ref lost. */
GPR_ASSERT(weak_ref.server == NULL);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size()));
@@ -299,8 +313,8 @@ int main(int argc, char** argv) {
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
-
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);
return 0;
diff --git a/test/core/iomgr/timer_list_test.cc b/test/core/iomgr/timer_list_test.cc
index deb8c4d87e..d74ea4fc96 100644
--- a/test/core/iomgr/timer_list_test.cc
+++ b/test/core/iomgr/timer_list_test.cc
@@ -25,7 +25,6 @@
#include <string.h>
-#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "test/core/util/test_config.h"
@@ -38,125 +37,127 @@ extern grpc_core::TraceFlag grpc_timer_check_trace;
static int cb_called[MAX_CB][2];
-static void cb(void* arg, grpc_error* error) {
+static void cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
cb_called[(intptr_t)arg][error == GRPC_ERROR_NONE]++;
}
static void add_test(void) {
int i;
grpc_timer timers[20];
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "add_test");
- grpc_timer_list_init();
+ grpc_timer_list_init(&exec_ctx);
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace);
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace);
memset(cb_called, 0, sizeof(cb_called));
- grpc_millis start = grpc_core::ExecCtx::Get()->Now();
+ grpc_millis start = grpc_exec_ctx_now(&exec_ctx);
/* 10 ms timers. will expire in the current epoch */
for (i = 0; i < 10; i++) {
grpc_timer_init(
- &timers[i], start + 10,
+ &exec_ctx, &timers[i], start + 10,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)i, grpc_schedule_on_exec_ctx));
}
/* 1010 ms timers. will expire in the next epoch */
for (i = 10; i < 20; i++) {
grpc_timer_init(
- &timers[i], start + 1010,
+ &exec_ctx, &timers[i], start + 1010,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)i, grpc_schedule_on_exec_ctx));
}
/* collect timers. Only the first batch should be ready. */
- grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 500);
- GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED);
- grpc_core::ExecCtx::Get()->Flush();
+ exec_ctx.now = start + 500;
+ GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED);
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 20; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
- grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 600);
- GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_CHECKED_AND_EMPTY);
- grpc_core::ExecCtx::Get()->Flush();
+ exec_ctx.now = start + 600;
+ GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) ==
+ GRPC_TIMERS_CHECKED_AND_EMPTY);
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
/* collect the rest of the timers */
- grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 1500);
- GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED);
- grpc_core::ExecCtx::Get()->Flush();
+ exec_ctx.now = start + 1500;
+ GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED);
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
- grpc_core::ExecCtx::Get()->TestOnlySetNow(start + 1600);
- GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_CHECKED_AND_EMPTY);
+ exec_ctx.now = start + 1600;
+ GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) ==
+ GRPC_TIMERS_CHECKED_AND_EMPTY);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
- grpc_timer_list_shutdown();
+ grpc_timer_list_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Cleaning up a list with pending timers. */
void destruction_test(void) {
grpc_timer timers[5];
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "destruction_test");
- grpc_core::ExecCtx::Get()->TestOnlySetNow(0);
- grpc_timer_list_init();
+ exec_ctx.now_is_valid = true;
+ exec_ctx.now = 0;
+ grpc_timer_list_init(&exec_ctx);
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace);
grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace);
memset(cb_called, 0, sizeof(cb_called));
grpc_timer_init(
- &timers[0], 100,
+ &exec_ctx, &timers[0], 100,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)0, grpc_schedule_on_exec_ctx));
grpc_timer_init(
- &timers[1], 3,
+ &exec_ctx, &timers[1], 3,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)1, grpc_schedule_on_exec_ctx));
grpc_timer_init(
- &timers[2], 100,
+ &exec_ctx, &timers[2], 100,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)2, grpc_schedule_on_exec_ctx));
grpc_timer_init(
- &timers[3], 3,
+ &exec_ctx, &timers[3], 3,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)3, grpc_schedule_on_exec_ctx));
grpc_timer_init(
- &timers[4], 1,
+ &exec_ctx, &timers[4], 1,
GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)4, grpc_schedule_on_exec_ctx));
- grpc_core::ExecCtx::Get()->TestOnlySetNow(2);
- GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED);
- grpc_core::ExecCtx::Get()->Flush();
+ exec_ctx.now = 2;
+ GPR_ASSERT(grpc_timer_check(&exec_ctx, nullptr) == GRPC_TIMERS_FIRED);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[4][1]);
- grpc_timer_cancel(&timers[0]);
- grpc_timer_cancel(&timers[3]);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_timer_cancel(&exec_ctx, &timers[0]);
+ grpc_timer_cancel(&exec_ctx, &timers[3]);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[0][0]);
GPR_ASSERT(1 == cb_called[3][0]);
- grpc_timer_list_shutdown();
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_timer_list_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[1][0]);
GPR_ASSERT(1 == cb_called[2][0]);
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
- grpc_core::ExecCtx::GlobalInit();
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
add_test();
destruction_test();
- grpc_core::ExecCtx::GlobalShutdown();
return 0;
}
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 0deb534abd..6e17be9cd6 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@ static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
-static bool on_read(grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
char read_buffer[512];
ssize_t byte_count;
@@ -61,27 +61,27 @@ static bool on_read(grpc_fd* emfd, void* user_data) {
g_number_of_reads++;
g_number_of_bytes_read += (int)byte_count;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
return false;
}
-static void on_write(grpc_fd* emfd, void* user_data,
+static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data,
grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
-static void on_fd_orphaned(grpc_fd* emfd, grpc_closure* closure,
- void* user_data) {
+static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
+ grpc_closure* closure, void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
- GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
@@ -130,22 +130,24 @@ static test_socket_factory* test_socket_factory_create(void) {
}
static void test_no_op(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
- grpc_udp_server_destroy(s, nullptr);
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
LOG_TEST("test_no_op_with_start");
- grpc_udp_server_start(s, nullptr, 0, nullptr);
- grpc_udp_server_destroy(s, nullptr);
+ grpc_udp_server_start(&exec_ctx, s, nullptr, 0, nullptr);
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
g_number_of_orphan_calls = 0;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
@@ -157,7 +159,8 @@ static void test_no_op_with_port(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
on_fd_orphaned));
- grpc_udp_server_destroy(s, nullptr);
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which should have been orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
@@ -165,7 +168,7 @@ static void test_no_op_with_port(void) {
static void test_no_op_with_port_and_socket_factory(void) {
g_number_of_orphan_calls = 0;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
@@ -175,7 +178,7 @@ static void test_no_op_with_port_and_socket_factory(void) {
grpc_channel_args* channel_args =
grpc_channel_args_copy_and_add(nullptr, &socket_factory_arg, 1);
grpc_udp_server* s = grpc_udp_server_create(channel_args);
- grpc_channel_args_destroy(channel_args);
+ grpc_channel_args_destroy(&exec_ctx, channel_args);
LOG_TEST("test_no_op_with_port_and_socket_factory");
@@ -187,8 +190,8 @@ static void test_no_op_with_port_and_socket_factory(void) {
GPR_ASSERT(socket_factory->number_of_socket_calls == 1);
GPR_ASSERT(socket_factory->number_of_bind_calls == 1);
- grpc_udp_server_destroy(s, nullptr);
-
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_socket_factory_unref(&socket_factory->base);
/* The server had a single FD, which should have been orphaned. */
@@ -197,7 +200,7 @@ static void test_no_op_with_port_and_socket_factory(void) {
static void test_no_op_with_port_and_start(void) {
g_number_of_orphan_calls = 0;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
@@ -209,9 +212,10 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
on_fd_orphaned));
- grpc_udp_server_start(s, nullptr, 0, nullptr);
+ grpc_udp_server_start(&exec_ctx, s, nullptr, 0, nullptr);
- grpc_udp_server_destroy(s, nullptr);
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
@@ -219,7 +223,7 @@ static void test_no_op_with_port_and_start(void) {
}
static void test_receive(int number_of_clients) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address resolved_addr;
struct sockaddr_storage* addr = (struct sockaddr_storage*)resolved_addr.addr;
int clifd, svrfd;
@@ -246,7 +250,7 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(resolved_addr.len <= sizeof(struct sockaddr_storage));
pollsets[0] = g_pollset;
- grpc_udp_server_start(s, pollsets, 1, nullptr);
+ grpc_udp_server_start(&exec_ctx, s, pollsets, 1, nullptr);
gpr_mu_lock(g_mu);
@@ -262,12 +266,13 @@ static void test_receive(int number_of_clients) {
(socklen_t)resolved_addr.len) == 0);
GPR_ASSERT(5 == write(clifd, "hello", 5));
while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
- deadline > grpc_core::ExecCtx::Get()->Now()) {
+ deadline > grpc_exec_ctx_now(&exec_ctx)) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
close(clifd);
@@ -276,40 +281,40 @@ static void test_receive(int number_of_clients) {
gpr_mu_unlock(g_mu);
- grpc_udp_server_destroy(s, nullptr);
+ grpc_udp_server_destroy(&exec_ctx, s, nullptr);
+ grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
}
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
-
- test_no_op();
- test_no_op_with_start();
- test_no_op_with_port();
- test_no_op_with_port_and_socket_factory();
- test_no_op_with_port_and_start();
- test_receive(1);
- test_receive(10);
-
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
- grpc_core::ExecCtx::Get()->Flush();
- gpr_free(g_pollset);
- }
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
+
+ test_no_op();
+ test_no_op_with_start();
+ test_no_op_with_port();
+ test_no_op_with_port_and_socket_factory();
+ test_no_op_with_port_and_start();
+ test_receive(1);
+ test_receive(10);
+
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(g_pollset);
grpc_shutdown();
return 0;
}