diff options
Diffstat (limited to 'test/core/iomgr/fd_posix_test.c')
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 121 |
1 files changed, 73 insertions, 48 deletions
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index ae6b56da77..f89d6c7824 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -52,7 +52,6 @@ #include "test/core/util/test_config.h" static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ @@ -119,18 +118,18 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ static void session_shutdown_cb(void *arg, /*session*/ - int success) { + int success, grpc_call_list *call_list) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, "a"); + grpc_fd_orphan(se->em_fd, NULL, "a", call_list); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(sv->em_fd); + grpc_fd_shutdown(sv->em_fd, call_list); } /* Called when data become readable in a session. */ static void session_read_cb(void *arg, /*session*/ - int success) { + int success, grpc_call_list *call_list) { session *se = arg; int fd = se->em_fd->fd; @@ -138,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/ ssize_t read_total = 0; if (!success) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(arg, 1, call_list); return; } @@ -153,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(arg, 1, call_list); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -164,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -174,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void *arg /*server*/, int success) { +static void listen_shutdown_cb(void *arg /*server*/, int success, + grpc_call_list *call_list) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, "b"); + grpc_fd_orphan(sv->em_fd, NULL, "b", call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -187,7 +187,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) { /* Called when a new TCP connection request arrives in the listening port. */ static void listen_cb(void *arg, /*=sv_arg*/ - int success) { + int success, grpc_call_list *call_list) { server *sv = arg; int fd; int flags; @@ -197,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ grpc_fd *listen_em_fd = sv->em_fd; if (!success) { - listen_shutdown_cb(arg, 1); + listen_shutdown_cb(arg, 1, call_list); return; } @@ -208,13 +208,13 @@ static void listen_cb(void *arg, /*=sv_arg*/ fcntl(fd, F_SETFL, flags | O_NONBLOCK); se = gpr_malloc(sizeof(*se)); se->sv = sv; - se->em_fd = grpc_fd_create(fd, g_workqueue, "listener"); - grpc_pollset_add_fd(&g_pollset, se->em_fd); + se->em_fd = grpc_fd_create(fd, "listener"); + grpc_pollset_add_fd(&g_pollset, se->em_fd, call_list); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list); - grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure, call_list); } /* Max number of connections pending to be accepted by listen(). */ @@ -224,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ listen_cb() is registered to be interested in reading from listen_fd. When connection request arrives, listen_cb() is called to accept the connection request. */ -static int server_start(server *sv) { +static int server_start(server *sv, grpc_call_list *call_list) { int port = 0; int fd; struct sockaddr_in sin; @@ -237,12 +237,12 @@ static int server_start(server *sv) { port = ntohs(sin.sin_port); GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); - sv->em_fd = grpc_fd_create(fd, g_workqueue, "server"); - grpc_pollset_add_fd(&g_pollset, sv->em_fd); + sv->em_fd = grpc_fd_create(fd, "server"); + grpc_pollset_add_fd(&g_pollset, sv->em_fd, call_list); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; - grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure, call_list); return port; } @@ -251,9 +251,13 @@ static int server_start(server *sv) { static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -286,23 +290,24 @@ static void client_init(client *cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void *arg /*client*/, int success) { +static void client_session_shutdown_cb(void *arg /*client*/, int success, + grpc_call_list *call_list) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, "c"); + grpc_fd_orphan(cl->em_fd, NULL, "c", call_list); cl->done = 1; grpc_pollset_kick(&g_pollset, NULL); } /* Write as much as possible, then register notify_on_write. */ static void client_session_write(void *arg, /*client*/ - int success) { + int success, grpc_call_list *call_list) { client *cl = arg; int fd = cl->em_fd->fd; ssize_t write_once = 0; if (!success) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(arg, 1, call_list); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -317,10 +322,10 @@ static void client_session_write(void *arg, /*client*/ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; - grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); + grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure, call_list); cl->client_write_cnt++; } else { - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(arg, 1, call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { @@ -330,7 +335,7 @@ static void client_session_write(void *arg, /*client*/ } /* Start a client to send a stream of bytes. */ -static void client_start(client *cl, int port) { +static void client_start(client *cl, int port, grpc_call_list *call_list) { int fd; struct sockaddr_in sin; create_test_socket(port, &fd, &sin); @@ -350,10 +355,10 @@ static void client_start(client *cl, int port) { } } - cl->em_fd = grpc_fd_create(fd, g_workqueue, "client"); - grpc_pollset_add_fd(&g_pollset, cl->em_fd); + cl->em_fd = grpc_fd_create(fd, "client"); + grpc_pollset_add_fd(&g_pollset, cl->em_fd, call_list); - client_session_write(cl, 1); + client_session_write(cl, 1, call_list); } /* Wait for the signal to shutdown a client. */ @@ -361,8 +366,12 @@ static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { grpc_pollset_worker worker; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -374,11 +383,13 @@ static void test_grpc_fd(void) { server sv; client cl; int port; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; server_init(&sv); - port = server_start(&sv); + port = server_start(&sv, &call_list); client_init(&cl); - client_start(&cl, port); + client_start(&cl, port, &call_list); + grpc_call_list_run(&call_list); client_wait_and_shutdown(&cl); server_wait_and_shutdown(&sv); GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); @@ -386,14 +397,15 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - void (*cb_that_ran)(void *, int success); + void (*cb_that_ran)(void *, int success, grpc_call_list *call_list); } fd_change_data; void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } void destroy_change_data(fd_change_data *fdc) {} -static void first_read_callback(void *arg /* fd_change_data */, int success) { +static void first_read_callback(void *arg /* fd_change_data */, int success, + grpc_call_list *call_list) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -402,7 +414,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void second_read_callback(void *arg /* fd_change_data */, int success) { +static void second_read_callback(void *arg /* fd_change_data */, int success, + grpc_call_list *call_list) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -424,6 +437,7 @@ static void test_grpc_fd_change(void) { ssize_t result; grpc_closure first_closure; grpc_closure second_closure; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; first_closure.cb = first_read_callback; first_closure.cb_arg = &a; @@ -439,11 +453,11 @@ static void test_grpc_fd_change(void) { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - em_fd = grpc_fd_create(sv[0], g_workqueue, "test_grpc_fd_change"); - grpc_pollset_add_fd(&g_pollset, em_fd); + em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); + grpc_pollset_add_fd(&g_pollset, em_fd, &call_list); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, &first_closure); + grpc_fd_notify_on_read(em_fd, &first_closure, &call_list); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -453,7 +467,10 @@ static void test_grpc_fd_change(void) { while (a.cb_that_ran == NULL) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -464,7 +481,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, &second_closure); + grpc_fd_notify_on_read(em_fd, &second_closure, &call_list); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -473,29 +490,37 @@ static void test_grpc_fd_change(void) { while (b.cb_that_ran == NULL) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL, "d"); + grpc_fd_orphan(em_fd, NULL, "d", &call_list); + grpc_call_list_run(&call_list); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); - g_workqueue = grpc_workqueue_create(); test_grpc_fd(); test_grpc_fd_change(); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_iomgr_shutdown(); return 0; } |