diff options
Diffstat (limited to 'test/core/iomgr/fd_posix_test.c')
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 103 |
1 files changed, 47 insertions, 56 deletions
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 2c8a89e4cd..fe08ec495f 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -51,6 +51,8 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" +static grpc_pollset g_pollset; + /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ #define BUF_SIZE 1024 @@ -94,16 +96,12 @@ void no_op_cb(void *arg, int success) {} typedef struct { grpc_fd *em_fd; /* listening fd */ ssize_t read_bytes_total; /* total number of received bytes */ - gpr_mu mu; /* protect done and done_cv */ - gpr_cv done_cv; /* signaled when a server finishes serving */ int done; /* set to 1 when a server finishes serving */ grpc_iomgr_closure listen_closure; } server; static void server_init(server *sv) { sv->read_bytes_total = 0; - gpr_mu_init(&sv->mu); - gpr_cv_init(&sv->done_cv); sv->done = 0; } @@ -122,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/ int success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, NULL); + grpc_fd_orphan(se->em_fd, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -177,12 +175,12 @@ static void session_read_cb(void *arg, /*session*/ static void listen_shutdown_cb(void *arg /*server*/, int success) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, NULL); + grpc_fd_orphan(sv->em_fd, NULL, "b"); - gpr_mu_lock(&sv->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; - gpr_cv_signal(&sv->done_cv); - gpr_mu_unlock(&sv->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Called when a new TCP connection request arrives in the listening port. */ @@ -209,6 +207,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); + grpc_pollset_add_fd(&g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); @@ -237,6 +236,7 @@ static int server_start(server *sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); + grpc_pollset_add_fd(&g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; @@ -247,12 +247,11 @@ static int server_start(server *sv) { /* Wait and shutdown a sever. */ static void server_wait_and_shutdown(server *sv) { - gpr_mu_lock(&sv->mu); - while (!sv->done) gpr_cv_wait(&sv->done_cv, &sv->mu, gpr_inf_future); - gpr_mu_unlock(&sv->mu); - - gpr_mu_destroy(&sv->mu); - gpr_cv_destroy(&sv->done_cv); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (!sv->done) { + grpc_pollset_work(&g_pollset, gpr_inf_future); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* ===An upload client to test notify_on_write=== */ @@ -271,9 +270,7 @@ typedef struct { notify_on_write to schedule another write. */ int client_write_cnt; - gpr_mu mu; /* protect done and done_cv */ - gpr_cv done_cv; /* signaled when a client finishes sending */ - int done; /* set to 1 when a client finishes sending */ + int done; /* set to 1 when a client finishes sending */ grpc_iomgr_closure write_closure; } client; @@ -281,17 +278,15 @@ static void client_init(client *cl) { memset(cl->write_buf, 0, sizeof(cl->write_buf)); cl->write_bytes_total = 0; cl->client_write_cnt = 0; - gpr_mu_init(&cl->mu); - gpr_cv_init(&cl->done_cv); cl->done = 0; } /* Called when a client upload session is ready to shutdown. */ static void client_session_shutdown_cb(void *arg /*client*/, int success) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, NULL); + grpc_fd_orphan(cl->em_fd, NULL, "c"); cl->done = 1; - gpr_cv_signal(&cl->done_cv); + grpc_pollset_kick(&g_pollset); } /* Write as much as possible, then register notify_on_write. */ @@ -302,9 +297,9 @@ static void client_session_write(void *arg, /*client*/ ssize_t write_once = 0; if (!success) { - gpr_mu_lock(&cl->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); client_session_shutdown_cb(arg, 1); - gpr_mu_unlock(&cl->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -314,7 +309,7 @@ static void client_session_write(void *arg, /*client*/ } while (write_once > 0); if (errno == EAGAIN) { - gpr_mu_lock(&cl->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; @@ -323,7 +318,7 @@ static void client_session_write(void *arg, /*client*/ } else { client_session_shutdown_cb(arg, 1); } - gpr_mu_unlock(&cl->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno)); abort(); @@ -352,18 +347,18 @@ static void client_start(client *cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); + grpc_pollset_add_fd(&g_pollset, cl->em_fd); client_session_write(cl, 1); } /* Wait for the signal to shutdown a client. */ static void client_wait_and_shutdown(client *cl) { - gpr_mu_lock(&cl->mu); - while (!cl->done) gpr_cv_wait(&cl->done_cv, &cl->mu, gpr_inf_future); - gpr_mu_unlock(&cl->mu); - - gpr_mu_destroy(&cl->mu); - gpr_cv_destroy(&cl->done_cv); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (!cl->done) { + grpc_pollset_work(&g_pollset, gpr_inf_future); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test grpc_fd. Start an upload server and client, upload a stream of @@ -385,38 +380,29 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - gpr_mu mu; - gpr_cv cv; void (*cb_that_ran)(void *, int success); } fd_change_data; -void init_change_data(fd_change_data *fdc) { - gpr_mu_init(&fdc->mu); - gpr_cv_init(&fdc->cv); - fdc->cb_that_ran = NULL; -} +void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } -void destroy_change_data(fd_change_data *fdc) { - gpr_mu_destroy(&fdc->mu); - gpr_cv_destroy(&fdc->cv); -} +void destroy_change_data(fd_change_data *fdc) {} static void first_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; - gpr_mu_lock(&fdc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = first_read_callback; - gpr_cv_signal(&fdc->cv); - gpr_mu_unlock(&fdc->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void second_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; - gpr_mu_lock(&fdc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = second_read_callback; - gpr_cv_signal(&fdc->cv); - gpr_mu_unlock(&fdc->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test that changing the callback we use for notify_on_read actually works. @@ -448,6 +434,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); + grpc_pollset_add_fd(&g_pollset, em_fd); /* Register the first callback, then make its FD readable */ grpc_fd_notify_on_read(em_fd, &first_closure); @@ -456,12 +443,12 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(result == 1); /* And now wait for it to run. */ - gpr_mu_lock(&a.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { - gpr_cv_wait(&a.cv, &a.mu, gpr_inf_future); + grpc_pollset_work(&g_pollset, gpr_inf_future); } GPR_ASSERT(a.cb_that_ran == first_read_callback); - gpr_mu_unlock(&a.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* And drain the socket so we can generate a new read edge */ result = read(sv[0], &data, 1); @@ -474,25 +461,29 @@ static void test_grpc_fd_change(void) { result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); - gpr_mu_lock(&b.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { - gpr_cv_wait(&b.cv, &b.mu, gpr_inf_future); + grpc_pollset_work(&g_pollset, gpr_inf_future); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); - gpr_mu_unlock(&b.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL, NULL); + grpc_fd_orphan(em_fd, NULL, "d"); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } +static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_iomgr_init(); + grpc_pollset_init(&g_pollset); test_grpc_fd(); test_grpc_fd_change(); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_iomgr_shutdown(); return 0; } |