aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/fd_posix_test.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:54 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:54 -0800
commit8cf1470a51ea276ca84825e7495d4ee24743540d (patch)
tree72385cc865094115bc08cb813201d48cb09840bb /test/core/iomgr/fd_posix_test.cc
parent1d4e99508409be052bd129ba507bae1fbe7eb7fa (diff)
Revert "Revert "All instances of exec_ctx being passed around in src/core removed""
Diffstat (limited to 'test/core/iomgr/fd_posix_test.cc')
-rw-r--r--test/core/iomgr/fd_posix_test.cc156
1 files changed, 74 insertions, 82 deletions
diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc
index a03d841ecd..cf75517538 100644
--- a/test/core/iomgr/fd_posix_test.cc
+++ b/test/core/iomgr/fd_posix_test.cc
@@ -111,20 +111,19 @@ typedef struct {
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
-static void session_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
+static void session_shutdown_cb(void* arg, /*session */
bool success) {
session* se = static_cast<session*>(arg);
server* sv = se->sv;
- grpc_fd_orphan(exec_ctx, se->em_fd, nullptr, nullptr,
- false /* already_closed */, "a");
+ grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_fd_shutdown(exec_ctx, sv->em_fd,
+ grpc_fd_shutdown(sv->em_fd,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
}
/* Called when data become readable in a session. */
-static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
+static void session_read_cb(void* arg, /*session */
grpc_error* error) {
session* se = static_cast<session*>(arg);
int fd = grpc_fd_wrapped_fd(se->em_fd);
@@ -133,7 +132,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
ssize_t read_total = 0;
if (error != GRPC_ERROR_NONE) {
- session_shutdown_cb(exec_ctx, arg, 1);
+ session_shutdown_cb(arg, 1);
return;
}
@@ -148,7 +147,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(exec_ctx, arg, 1);
+ session_shutdown_cb(arg, 1);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -159,7 +158,7 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@@ -169,22 +168,20 @@ static void session_read_cb(grpc_exec_ctx* exec_ctx, void* arg, /*session */
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(grpc_exec_ctx* exec_ctx, void* arg /*server */,
- int success) {
+static void listen_shutdown_cb(void* arg /*server */, int success) {
server* sv = static_cast<server*>(arg);
- grpc_fd_orphan(exec_ctx, sv->em_fd, nullptr, nullptr,
- false /* already_closed */, "b");
+ grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b");
gpr_mu_lock(g_mu);
sv->done = 1;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
/* Called when a new TCP connection request arrives in the listening port. */
-static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/
+static void listen_cb(void* arg, /*=sv_arg*/
grpc_error* error) {
server* sv = static_cast<server*>(arg);
int fd;
@@ -195,7 +192,7 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/
grpc_fd* listen_em_fd = sv->em_fd;
if (error != GRPC_ERROR_NONE) {
- listen_shutdown_cb(exec_ctx, arg, 1);
+ listen_shutdown_cb(arg, 1);
return;
}
@@ -207,12 +204,12 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
- grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd);
+ grpc_pollset_add_fd(g_pollset, se->em_fd);
GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
- grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
}
/* Max number of connections pending to be accepted by listen(). */
@@ -222,7 +219,7 @@ static void listen_cb(grpc_exec_ctx* exec_ctx, void* arg, /*=sv_arg*/
listen_cb() is registered to be interested in reading from listen_fd.
When connection request arrives, listen_cb() is called to accept the
connection request. */
-static int server_start(grpc_exec_ctx* exec_ctx, server* sv) {
+static int server_start(server* sv) {
int port = 0;
int fd;
struct sockaddr_in sin;
@@ -236,11 +233,11 @@ static int server_start(grpc_exec_ctx* exec_ctx, server* sv) {
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd, "server");
- grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd);
+ grpc_pollset_add_fd(g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
return port;
}
@@ -249,13 +246,13 @@ static int server_start(grpc_exec_ctx* exec_ctx, server* sv) {
static void server_wait_and_shutdown(server* sv) {
gpr_mu_lock(g_mu);
while (!sv->done) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work",
+ grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -289,18 +286,16 @@ static void client_init(client* cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(grpc_exec_ctx* exec_ctx,
- void* arg /*client */, int success) {
+static void client_session_shutdown_cb(void* arg /*client */, int success) {
client* cl = static_cast<client*>(arg);
- grpc_fd_orphan(exec_ctx, cl->em_fd, nullptr, nullptr,
- false /* already_closed */, "c");
+ grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c");
cl->done = 1;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
}
/* Write as much as possible, then register notify_on_write. */
-static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */
+static void client_session_write(void* arg, /*client */
grpc_error* error) {
client* cl = static_cast<client*>(arg);
int fd = grpc_fd_wrapped_fd(cl->em_fd);
@@ -308,7 +303,7 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */
if (error != GRPC_ERROR_NONE) {
gpr_mu_lock(g_mu);
- client_session_shutdown_cb(exec_ctx, arg, 1);
+ client_session_shutdown_cb(arg, 1);
gpr_mu_unlock(g_mu);
return;
}
@@ -323,10 +318,10 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
+ grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(exec_ctx, arg, 1);
+ client_session_shutdown_cb(arg, 1);
}
gpr_mu_unlock(g_mu);
} else {
@@ -336,7 +331,7 @@ static void client_session_write(grpc_exec_ctx* exec_ctx, void* arg, /*client */
}
/* Start a client to send a stream of bytes. */
-static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) {
+static void client_start(client* cl, int port) {
int fd;
struct sockaddr_in sin;
create_test_socket(port, &fd, &sin);
@@ -357,9 +352,9 @@ static void client_start(grpc_exec_ctx* exec_ctx, client* cl, int port) {
}
cl->em_fd = grpc_fd_create(fd, "client");
- grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd);
+ grpc_pollset_add_fd(g_pollset, cl->em_fd);
- client_session_write(exec_ctx, cl, GRPC_ERROR_NONE);
+ client_session_write(cl, GRPC_ERROR_NONE);
}
/* Wait for the signal to shutdown a client. */
@@ -367,12 +362,12 @@ static void client_wait_and_shutdown(client* cl) {
gpr_mu_lock(g_mu);
while (!cl->done) {
grpc_pollset_worker* worker = nullptr;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work",
+ grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -385,13 +380,13 @@ static void test_grpc_fd(void) {
server sv;
client cl;
int port;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
server_init(&sv);
- port = server_start(&exec_ctx, &sv);
+ port = server_start(&sv);
client_init(&cl);
- client_start(&exec_ctx, &cl, port);
- grpc_exec_ctx_finish(&exec_ctx);
+ client_start(&cl, port);
+
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
@@ -406,27 +401,25 @@ void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
void destroy_change_data(fd_change_data* fdc) {}
-static void first_read_callback(grpc_exec_ctx* exec_ctx,
- void* arg /* fd_change_data */,
+static void first_read_callback(void* arg /* fd_change_data */,
grpc_error* error) {
fd_change_data* fdc = static_cast<fd_change_data*>(arg);
gpr_mu_lock(g_mu);
fdc->cb_that_ran = first_read_callback;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
-static void second_read_callback(grpc_exec_ctx* exec_ctx,
- void* arg /* fd_change_data */,
+static void second_read_callback(void* arg /* fd_change_data */,
grpc_error* error) {
fd_change_data* fdc = static_cast<fd_change_data*>(arg);
gpr_mu_lock(g_mu);
fdc->cb_that_ran = second_read_callback;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
@@ -443,7 +436,7 @@ static void test_grpc_fd_change(void) {
ssize_t result;
grpc_closure first_closure;
grpc_closure second_closure;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
grpc_schedule_on_exec_ctx);
@@ -460,10 +453,10 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
- grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd);
+ grpc_pollset_add_fd(g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure);
+ grpc_fd_notify_on_read(em_fd, &first_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -473,10 +466,10 @@ static void test_grpc_fd_change(void) {
while (a.cb_that_ran == nullptr) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work",
+ grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
gpr_mu_lock(g_mu);
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
@@ -488,7 +481,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure);
+ grpc_fd_notify_on_read(em_fd, &second_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -497,44 +490,43 @@ static void test_grpc_fd_change(void) {
while (b.cb_that_ran == nullptr) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- GRPC_MILLIS_INF_FUTURE)));
+ "pollset_work",
+ grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
gpr_mu_lock(g_mu);
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
- grpc_fd_orphan(&exec_ctx, em_fd, nullptr, nullptr, false /* already_closed */,
- "d");
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d");
+
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
-static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
- grpc_error* error) {
- grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(p));
+static void destroy_pollset(void* p, grpc_error* error) {
+ grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
}
int main(int argc, char** argv) {
grpc_closure destroyed;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(g_pollset, &g_mu);
- test_grpc_fd();
- test_grpc_fd_change();
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
- grpc_exec_ctx_flush(&exec_ctx);
- gpr_free(g_pollset);
- grpc_exec_ctx_finish(&exec_ctx);
+ {
+ grpc_core::ExecCtx exec_ctx;
+ g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
+ grpc_pollset_init(g_pollset, &g_mu);
+ test_grpc_fd();
+ test_grpc_fd_change();
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(g_pollset, &destroyed);
+ grpc_core::ExecCtx::Get()->Flush();
+ gpr_free(g_pollset);
+ }
grpc_shutdown();
return 0;
}