diff options
author | Yash Tibrewal <yashkt@google.com> | 2017-12-06 09:47:54 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-06 09:47:54 -0800 |
commit | 8cf1470a51ea276ca84825e7495d4ee24743540d (patch) | |
tree | 72385cc865094115bc08cb813201d48cb09840bb /test/core/iomgr/fd_posix_test.cc | |
parent | 1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff) |
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'test/core/iomgr/fd_posix_test.cc')
-rw-r--r-- | test/core/iomgr/fd_posix_test.cc | 156 |
1 files changed, 74 insertions, 82 deletions
diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc index a03d841ecd..cf75517538 100644 --- a/test/core/iomgr/fd_posix_test.cc +++ b/test/core/iomgr/fd_posix_test.cc @@ -111,20 +111,19 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ -static void session_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ +static void session_shutdown_cb(void* arg, /*session */ bool success) { session* se = static_cast<session*>(arg); server* sv = se->sv; - grpc_fd_orphan(exec_ctx, se->em_fd, nullptr, nullptr, - false /* already_closed */, "a"); + grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a"); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(exec_ctx, sv->em_fd, + grpc_fd_shutdown(sv->em_fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb")); } /* Called when data become readable in a session. */ -static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ +static void session_read_cb(void* arg, /*session */ grpc_error* error) { session* se = static_cast<session*>(arg); int fd = grpc_fd_wrapped_fd(se->em_fd); @@ -133,7 +132,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ ssize_t read_total = 0; if (error != GRPC_ERROR_NONE) { - session_shutdown_cb(exec_ctx, arg, 1); + session_shutdown_cb(arg, 1); return; } @@ -148,7 +147,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(exec_ctx, arg, 1); + session_shutdown_cb(arg, 1); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -159,7 +158,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -169,22 +168,20 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg /*server */, - int success) { +static void listen_shutdown_cb(void* arg /*server */, int success) { server* sv = static_cast<server*>(arg); - grpc_fd_orphan(exec_ctx, sv->em_fd, nullptr, nullptr, - false /* already_closed */, "b"); + grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b"); gpr_mu_lock(g_mu); sv->done = 1; - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); } /* Called when a new TCP connection request arrives in the listening port. */ -static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/ +static void listen_cb(void* arg, /*=sv_arg*/ grpc_error* error) { server* sv = static_cast<server*>(arg); int fd; @@ -195,7 +192,7 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/ grpc_fd* listen_em_fd = sv->em_fd; if (error != GRPC_ERROR_NONE) { - listen_shutdown_cb(exec_ctx, arg, 1); + listen_shutdown_cb(arg, 1); return; } @@ -207,12 +204,12 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/ se = static_cast<session*>(gpr_malloc(sizeof(*se))); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); - grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd); + grpc_pollset_add_fd(g_pollset, se->em_fd); GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); - grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); } /* Max number of connections pending to be accepted by listen(). */ @@ -222,7 +219,7 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/ listen_cb() is registered to be interested in reading from listen_fd. When connection request arrives, listen_cb() is called to accept the connection request. */ -static int server_start(grpc_exec_ctx* exec_ctx, server* sv) { +static int server_start(server* sv) { int port = 0; int fd; struct sockaddr_in sin; @@ -236,11 +233,11 @@ static int server_start(grpc_exec_ctx* exec_ctx, server* sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); - grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd); + grpc_pollset_add_fd(g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); return port; } @@ -249,13 +246,13 @@ static int server_start(grpc_exec_ctx* exec_ctx, server* sv) { static void server_wait_and_shutdown(server* sv) { gpr_mu_lock(g_mu); while (!sv->done) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, - GRPC_MILLIS_INF_FUTURE))); + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -289,18 +286,16 @@ static void client_init(client* cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(grpc_exec_ctx* exec_ctx, - void* arg /*client */, int success) { +static void client_session_shutdown_cb(void* arg /*client */, int success) { client* cl = static_cast<client*>(arg); - grpc_fd_orphan(exec_ctx, cl->em_fd, nullptr, nullptr, - false /* already_closed */, "c"); + grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c"); cl->done = 1; - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); } /* Write as much as possible, then register notify_on_write. */ -static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */ +static void client_session_write(void* arg, /*client */ grpc_error* error) { client* cl = static_cast<client*>(arg); int fd = grpc_fd_wrapped_fd(cl->em_fd); @@ -308,7 +303,7 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */ if (error != GRPC_ERROR_NONE) { gpr_mu_lock(g_mu); - client_session_shutdown_cb(exec_ctx, arg, 1); + client_session_shutdown_cb(arg, 1); gpr_mu_unlock(g_mu); return; } @@ -323,10 +318,10 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure); + grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); cl->client_write_cnt++; } else { - client_session_shutdown_cb(exec_ctx, arg, 1); + client_session_shutdown_cb(arg, 1); } gpr_mu_unlock(g_mu); } else { @@ -336,7 +331,7 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */ } /* Start a client to send a stream of bytes. */ -static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) { +static void client_start(client* cl, int port) { int fd; struct sockaddr_in sin; create_test_socket(port, &fd, &sin); @@ -357,9 +352,9 @@ static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); - grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd); + grpc_pollset_add_fd(g_pollset, cl->em_fd); - client_session_write(exec_ctx, cl, GRPC_ERROR_NONE); + client_session_write(cl, GRPC_ERROR_NONE); } /* Wait for the signal to shutdown a client. */ @@ -367,12 +362,12 @@ static void client_wait_and_shutdown(client* cl) { gpr_mu_lock(g_mu); while (!cl->done) { grpc_pollset_worker* worker = nullptr; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, - GRPC_MILLIS_INF_FUTURE))); + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -385,13 +380,13 @@ static void test_grpc_fd(void) { server sv; client cl; int port; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; server_init(&sv); - port = server_start(&exec_ctx, &sv); + port = server_start(&sv); client_init(&cl); - client_start(&exec_ctx, &cl, port); - grpc_exec_ctx_finish(&exec_ctx); + client_start(&cl, port); + client_wait_and_shutdown(&cl); server_wait_and_shutdown(&sv); GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); @@ -406,27 +401,25 @@ void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; } void destroy_change_data(fd_change_data* fdc) {} -static void first_read_callback(grpc_exec_ctx* exec_ctx, - void* arg /* fd_change_data */, +static void first_read_callback(void* arg /* fd_change_data */, grpc_error* error) { fd_change_data* fdc = static_cast<fd_change_data*>(arg); gpr_mu_lock(g_mu); fdc->cb_that_ran = first_read_callback; - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); } -static void second_read_callback(grpc_exec_ctx* exec_ctx, - void* arg /* fd_change_data */, +static void second_read_callback(void* arg /* fd_change_data */, grpc_error* error) { fd_change_data* fdc = static_cast<fd_change_data*>(arg); gpr_mu_lock(g_mu); fdc->cb_that_ran = second_read_callback; - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); } @@ -443,7 +436,7 @@ static void test_grpc_fd_change(void) { ssize_t result; grpc_closure first_closure; grpc_closure second_closure; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a, grpc_schedule_on_exec_ctx); @@ -460,10 +453,10 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); - grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd); + grpc_pollset_add_fd(g_pollset, em_fd); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure); + grpc_fd_notify_on_read(em_fd, &first_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -473,10 +466,10 @@ static void test_grpc_fd_change(void) { while (a.cb_that_ran == nullptr) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, - GRPC_MILLIS_INF_FUTURE))); + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(g_mu); } GPR_ASSERT(a.cb_that_ran == first_read_callback); @@ -488,7 +481,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure); + grpc_fd_notify_on_read(em_fd, &second_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -497,44 +490,43 @@ static void test_grpc_fd_change(void) { while (b.cb_that_ran == nullptr) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, - GRPC_MILLIS_INF_FUTURE))); + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(g_mu); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(g_mu); - grpc_fd_orphan(&exec_ctx, em_fd, nullptr, nullptr, false /* already_closed */, - "d"); - grpc_exec_ctx_finish(&exec_ctx); + grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d"); + destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } -static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, - grpc_error* error) { - grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p)); +static void destroy_pollset(void* p, grpc_error* error) { + grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); } int main(int argc, char** argv) { grpc_closure destroyed; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); - grpc_pollset_init(g_pollset, &g_mu); - test_grpc_fd(); - test_grpc_fd_change(); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); - grpc_exec_ctx_flush(&exec_ctx); - gpr_free(g_pollset); - grpc_exec_ctx_finish(&exec_ctx); + { + grpc_core::ExecCtx exec_ctx; + g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + test_grpc_fd(); + test_grpc_fd_change(); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(g_pollset, &destroyed); + grpc_core::ExecCtx::Get()->Flush(); + gpr_free(g_pollset); + } grpc_shutdown(); return 0; } |