diff options
Diffstat (limited to 'test/core/iomgr')
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 80 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_tests.h | 2 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 88 | ||||
-rw-r--r-- | test/core/iomgr/tcp_client_posix_test.c | 88 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 88 | ||||
-rw-r--r-- | test/core/iomgr/tcp_server_posix_test.c | 50 |
6 files changed, 219 insertions, 177 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index f9c5282f19..95dca3294a 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -57,6 +57,8 @@ */ +static grpc_pollset *g_pollset; + size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, int *current_data) { size_t num_bytes = 0; @@ -111,8 +113,6 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, struct read_and_write_test_state { grpc_endpoint *read_ep; grpc_endpoint *write_ep; - gpr_mu mu; - gpr_cv cv; size_t target_bytes; size_t bytes_read; size_t current_write_size; @@ -130,10 +130,10 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices, GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { gpr_log(GPR_INFO, "Read handler shutdown"); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->read_done = 1; - gpr_cv_signal(&state->cv); - gpr_mu_unlock(&state->mu); + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); return; } @@ -141,10 +141,10 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices, count_and_unref_slices(slices, nslices, &state->current_read_data); if (state->bytes_read == state->target_bytes) { gpr_log(GPR_INFO, "Read handler done"); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->read_done = 1; - gpr_cv_signal(&state->cv); - gpr_mu_unlock(&state->mu); + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else { grpc_endpoint_notify_on_read(state->read_ep, read_and_write_test_read_handler, data); @@ -164,10 +164,10 @@ static void read_and_write_test_write_handler(void *data, if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { gpr_log(GPR_INFO, "Write handler shutdown"); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->write_done = 1; - gpr_cv_signal(&state->cv); - gpr_mu_unlock(&state->mu); + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); return; } @@ -198,10 +198,10 @@ static void read_and_write_test_write_handler(void *data, GPR_ASSERT(state->bytes_written == state->target_bytes); gpr_log(GPR_INFO, "Write handler done"); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->write_done = 1; - gpr_cv_signal(&state->cv); - gpr_mu_unlock(&state->mu); + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } /* Do both reading and writing using the grpc_endpoint API. @@ -222,9 +222,6 @@ static void read_and_write_test(grpc_endpoint_test_config config, num_bytes, slice_size); } - gpr_mu_init(&state.mu); - gpr_cv_init(&state.cv); - state.read_ep = f.client_ep; state.write_ep = f.server_ep; state.target_bytes = num_bytes; @@ -253,29 +250,24 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_endpoint_shutdown(state.write_ep); } - gpr_mu_lock(&state.mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { - if (gpr_cv_wait(&state.cv, &state.mu, deadline)) { - gpr_log(GPR_ERROR, "timeout: read_done=%d, write_done=%d", - state.read_done, state.write_done); - abort(); - } + GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0); + grpc_pollset_work(g_pollset, deadline); } - gpr_mu_unlock(&state.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); grpc_endpoint_destroy(state.read_ep); grpc_endpoint_destroy(state.write_ep); - gpr_mu_destroy(&state.mu); - gpr_cv_destroy(&state.cv); end_test(config); } struct timeout_test_state { - gpr_event io_done; + int io_done; }; typedef struct { - gpr_event ev; + int done; grpc_endpoint *ep; } shutdown_during_write_test_state; @@ -291,7 +283,10 @@ static void shutdown_during_write_test_read_handler( if (error != GRPC_ENDPOINT_CB_OK) { grpc_endpoint_destroy(st->ep); - gpr_event_set(&st->ev, (void *)(gpr_intptr) error); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + st->done = error; + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else { grpc_endpoint_notify_on_read( st->ep, shutdown_during_write_test_read_handler, user_data); @@ -310,7 +305,10 @@ static void shutdown_during_write_test_write_handler( gpr_log(GPR_ERROR, "shutdown_during_write_test_write_handler completed unexpectedly"); } - gpr_event_set(&st->ev, (void *)(gpr_intptr) 1); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + st->done = 1; + grpc_pollset_kick(g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } static void shutdown_during_write_test(grpc_endpoint_test_config config, @@ -329,8 +327,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, read_st.ep = f.client_ep; write_st.ep = f.server_ep; - gpr_event_init(&read_st.ev); - gpr_event_init(&write_st.ev); + read_st.done = 0; + write_st.done = 0; grpc_endpoint_notify_on_read( read_st.ep, shutdown_during_write_test_read_handler, &read_st); @@ -347,9 +345,19 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, case GRPC_ENDPOINT_WRITE_PENDING: grpc_endpoint_shutdown(write_st.ep); deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); - GPR_ASSERT(gpr_event_wait(&write_st.ev, deadline)); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + while (!write_st.done) { + GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0); + grpc_pollset_work(g_pollset, deadline); + } + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); grpc_endpoint_destroy(write_st.ep); - GPR_ASSERT(gpr_event_wait(&read_st.ev, deadline)); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + while (!read_st.done) { + GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0); + grpc_pollset_work(g_pollset, deadline); + } + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); gpr_free(slices); end_test(config); return; @@ -361,9 +369,11 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config, abort(); } -void grpc_endpoint_tests(grpc_endpoint_test_config config) { +void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset) { + g_pollset = pollset; read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); shutdown_during_write_test(config, 1000); + g_pollset = NULL; } diff --git a/test/core/iomgr/endpoint_tests.h b/test/core/iomgr/endpoint_tests.h index 1679d7bd4f..852e71d479 100644 --- a/test/core/iomgr/endpoint_tests.h +++ b/test/core/iomgr/endpoint_tests.h @@ -52,6 +52,6 @@ struct grpc_endpoint_test_config { void (*clean_up)(); }; -void grpc_endpoint_tests(grpc_endpoint_test_config config); +void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset); #endif /* GRPC_TEST_CORE_IOMGR_ENDPOINT_TESTS_H */ diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 57e2c6fc17..207862bd8d 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -51,6 +51,8 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" +static grpc_pollset g_pollset; + /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ #define BUF_SIZE 1024 @@ -94,16 +96,12 @@ void no_op_cb(void *arg, int success) {} typedef struct { grpc_fd *em_fd; /* listening fd */ ssize_t read_bytes_total; /* total number of received bytes */ - gpr_mu mu; /* protect done and done_cv */ - gpr_cv done_cv; /* signaled when a server finishes serving */ int done; /* set to 1 when a server finishes serving */ grpc_iomgr_closure listen_closure; } server; static void server_init(server *sv) { sv->read_bytes_total = 0; - gpr_mu_init(&sv->mu); - gpr_cv_init(&sv->done_cv); sv->done = 0; } @@ -179,10 +177,10 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) { grpc_fd_orphan(sv->em_fd, NULL, NULL); - gpr_mu_lock(&sv->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; - gpr_cv_signal(&sv->done_cv); - gpr_mu_unlock(&sv->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Called when a new TCP connection request arrives in the listening port. */ @@ -209,6 +207,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd); + grpc_pollset_add_fd(&g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); @@ -237,6 +236,7 @@ static int server_start(server *sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd); + grpc_pollset_add_fd(&g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; @@ -247,12 +247,11 @@ static int server_start(server *sv) { /* Wait and shutdown a sever. */ static void server_wait_and_shutdown(server *sv) { - gpr_mu_lock(&sv->mu); - while (!sv->done) gpr_cv_wait(&sv->done_cv, &sv->mu, gpr_inf_future); - gpr_mu_unlock(&sv->mu); - - gpr_mu_destroy(&sv->mu); - gpr_cv_destroy(&sv->done_cv); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (!sv->done) { + grpc_pollset_work(&g_pollset, gpr_inf_future); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* ===An upload client to test notify_on_write=== */ @@ -271,8 +270,6 @@ typedef struct { notify_on_write to schedule another write. */ int client_write_cnt; - gpr_mu mu; /* protect done and done_cv */ - gpr_cv done_cv; /* signaled when a client finishes sending */ int done; /* set to 1 when a client finishes sending */ grpc_iomgr_closure write_closure; } client; @@ -281,8 +278,6 @@ static void client_init(client *cl) { memset(cl->write_buf, 0, sizeof(cl->write_buf)); cl->write_bytes_total = 0; cl->client_write_cnt = 0; - gpr_mu_init(&cl->mu); - gpr_cv_init(&cl->done_cv); cl->done = 0; } @@ -291,7 +286,7 @@ static void client_session_shutdown_cb(void *arg /*client*/, int success) { client *cl = arg; grpc_fd_orphan(cl->em_fd, NULL, NULL); cl->done = 1; - gpr_cv_signal(&cl->done_cv); + grpc_pollset_kick(&g_pollset); } /* Write as much as possible, then register notify_on_write. */ @@ -302,9 +297,9 @@ static void client_session_write(void *arg, /*client*/ ssize_t write_once = 0; if (!success) { - gpr_mu_lock(&cl->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); client_session_shutdown_cb(arg, 1); - gpr_mu_unlock(&cl->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -314,7 +309,7 @@ static void client_session_write(void *arg, /*client*/ } while (write_once > 0); if (errno == EAGAIN) { - gpr_mu_lock(&cl->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; @@ -323,7 +318,7 @@ static void client_session_write(void *arg, /*client*/ } else { client_session_shutdown_cb(arg, 1); } - gpr_mu_unlock(&cl->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno)); abort(); @@ -352,18 +347,18 @@ static void client_start(client *cl, int port) { } cl->em_fd = grpc_fd_create(fd); + grpc_pollset_add_fd(&g_pollset, cl->em_fd); client_session_write(cl, 1); } /* Wait for the signal to shutdown a client. */ static void client_wait_and_shutdown(client *cl) { - gpr_mu_lock(&cl->mu); - while (!cl->done) gpr_cv_wait(&cl->done_cv, &cl->mu, gpr_inf_future); - gpr_mu_unlock(&cl->mu); - - gpr_mu_destroy(&cl->mu); - gpr_cv_destroy(&cl->done_cv); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (!cl->done) { + grpc_pollset_work(&g_pollset, gpr_inf_future); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test grpc_fd. Start an upload server and client, upload a stream of @@ -385,38 +380,32 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - gpr_mu mu; - gpr_cv cv; void (*cb_that_ran)(void *, int success); } fd_change_data; void init_change_data(fd_change_data *fdc) { - gpr_mu_init(&fdc->mu); - gpr_cv_init(&fdc->cv); fdc->cb_that_ran = NULL; } void destroy_change_data(fd_change_data *fdc) { - gpr_mu_destroy(&fdc->mu); - gpr_cv_destroy(&fdc->cv); } static void first_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; - gpr_mu_lock(&fdc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = first_read_callback; - gpr_cv_signal(&fdc->cv); - gpr_mu_unlock(&fdc->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void second_read_callback(void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; - gpr_mu_lock(&fdc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = second_read_callback; - gpr_cv_signal(&fdc->cv); - gpr_mu_unlock(&fdc->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test that changing the callback we use for notify_on_read actually works. @@ -448,6 +437,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0]); + grpc_pollset_add_fd(&g_pollset, em_fd); /* Register the first callback, then make its FD readable */ grpc_fd_notify_on_read(em_fd, &first_closure); @@ -456,12 +446,12 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(result == 1); /* And now wait for it to run. */ - gpr_mu_lock(&a.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { - gpr_cv_wait(&a.cv, &a.mu, gpr_inf_future); + grpc_pollset_work(&g_pollset, gpr_inf_future); } GPR_ASSERT(a.cb_that_ran == first_read_callback); - gpr_mu_unlock(&a.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* And drain the socket so we can generate a new read edge */ result = read(sv[0], &data, 1); @@ -474,13 +464,13 @@ static void test_grpc_fd_change(void) { result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); - gpr_mu_lock(&b.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { - gpr_cv_wait(&b.cv, &b.mu, gpr_inf_future); + grpc_pollset_work(&g_pollset, gpr_inf_future); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); - gpr_mu_unlock(&b.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_fd_orphan(em_fd, NULL, NULL); destroy_change_data(&a); @@ -488,11 +478,17 @@ static void test_grpc_fd_change(void) { close(sv[1]); } +static void destroy_pollset(void *p) { + grpc_pollset_destroy(p); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_iomgr_init(); + grpc_pollset_init(&g_pollset); test_grpc_fd(); test_grpc_fd_change(); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 3c4d8fed4f..945f7c1bdc 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -45,20 +45,31 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" +static grpc_pollset_set g_pollset_set; +static grpc_pollset g_pollset; +static int g_connections_complete = 0; + static gpr_timespec test_deadline(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); } +static void finish_connection() { + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + g_connections_complete++; + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); +} + static void must_succeed(void *arg, grpc_endpoint *tcp) { GPR_ASSERT(tcp); grpc_endpoint_shutdown(tcp); grpc_endpoint_destroy(tcp); - gpr_event_set(arg, (void *)1); + finish_connection(); } static void must_fail(void *arg, grpc_endpoint *tcp) { GPR_ASSERT(!tcp); - gpr_event_set(arg, (void *)1); + finish_connection(); } void test_succeeds(void) { @@ -66,9 +77,7 @@ void test_succeeds(void) { socklen_t addr_len = sizeof(addr); int svr_fd; int r; - gpr_event ev; - - gpr_event_init(&ev); + int connections_complete_before; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -79,9 +88,13 @@ void test_succeeds(void) { GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); GPR_ASSERT(0 == listen(svr_fd, 1)); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + connections_complete_before = g_connections_complete; + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); - grpc_tcp_client_connect(must_succeed, &ev, (struct sockaddr *)&addr, addr_len, + grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future); /* await the connection */ @@ -92,26 +105,39 @@ void test_succeeds(void) { GPR_ASSERT(r >= 0); close(r); - /* wait for the connection callback to finish */ - GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + + while (g_connections_complete == connections_complete_before) { + grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); + } + + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } void test_fails(void) { struct sockaddr_in addr; socklen_t addr_len = sizeof(addr); - gpr_event ev; - - gpr_event_init(&ev); + int connections_complete_before; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + connections_complete_before = g_connections_complete; + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + /* connect to a broken address */ - grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len, + grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + /* wait for the connection callback to finish */ - GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + while (g_connections_complete == connections_complete_before) { + grpc_pollset_work(&g_pollset, test_deadline()); + } + + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } void test_times_out(void) { @@ -122,11 +148,9 @@ void test_times_out(void) { int client_fd[NUM_CLIENT_CONNECTS]; int i; int r; - gpr_event ev; + int connections_complete_before; gpr_timespec connect_deadline; - gpr_event_init(&ev); - memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; @@ -153,28 +177,48 @@ void test_times_out(void) { connect_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len, + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + connections_complete_before = g_connections_complete; + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + + grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, (struct sockaddr *)&addr, addr_len, connect_deadline); + /* Make sure the event doesn't trigger early */ - GPR_ASSERT(!gpr_event_wait(&ev, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(500))); - /* Now wait until it should have triggered */ - sleep(1); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(2)), gpr_now()) > 0) { + int is_after_deadline = gpr_time_cmp(connect_deadline, gpr_now()) <= 0; + if (is_after_deadline && gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(1)), gpr_now()) > 0) { + /* allow some slack before insisting that things be done */ + } else { + GPR_ASSERT(g_connections_complete == connections_complete_before + is_after_deadline); + } + grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - /* wait for the connection callback to finish */ - GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); close(svr_fd); for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { close(client_fd[i]); } } +static void destroy_pollset(void *p) { + grpc_pollset_destroy(p); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_iomgr_init(); + grpc_pollset_set_init(&g_pollset_set); + grpc_pollset_init(&g_pollset); + grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); + grpc_pollset_set_destroy(&g_pollset_set); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 40abed5f6e..04aed132f4 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -48,6 +48,8 @@ #include "test/core/util/test_config.h" #include "test/core/iomgr/endpoint_tests.h" +static grpc_pollset g_pollset; + /* General test notes: @@ -114,8 +116,6 @@ static size_t fill_socket_partial(int fd, size_t bytes) { struct read_socket_state { grpc_endpoint *ep; - gpr_mu mu; - gpr_cv cv; ssize_t read_bytes; ssize_t target_read_bytes; }; @@ -145,18 +145,18 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); current_data = state->read_bytes % 256; read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); state->read_bytes += read_bytes; gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_cv_signal(&state->cv); + /* empty */ } else { grpc_endpoint_notify_on_read(state->ep, read_cb, state); } - gpr_mu_unlock(&state->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Write to a socket, then read from it using the grpc_tcp API. */ @@ -173,31 +173,25 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + grpc_endpoint_add_to_pollset(ep, &g_pollset); + written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); - gpr_mu_init(&state.mu); - gpr_cv_init(&state.cv); state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; grpc_endpoint_notify_on_read(ep, read_cb, &state); - gpr_mu_lock(&state.mu); - for (;;) { - GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); - if (state.read_bytes >= state.target_read_bytes) { - break; - } + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (state.read_bytes < state.target_read_bytes) { + grpc_pollset_work(&g_pollset, deadline); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(&state.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_endpoint_destroy(ep); - - gpr_mu_destroy(&state.mu); - gpr_cv_destroy(&state.cv); } /* Write to a socket until it fills up, then read from it using the grpc_tcp @@ -214,37 +208,28 @@ static void large_read_test(ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); - gpr_mu_init(&state.mu); - gpr_cv_init(&state.cv); state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; grpc_endpoint_notify_on_read(ep, read_cb, &state); - gpr_mu_lock(&state.mu); - for (;;) { - GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); - if (state.read_bytes >= state.target_read_bytes) { - break; - } + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (state.read_bytes < state.target_read_bytes) { + grpc_pollset_work(&g_pollset, deadline); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(&state.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_endpoint_destroy(ep); - - gpr_mu_destroy(&state.mu); - gpr_cv_destroy(&state.cv); } struct write_socket_state { grpc_endpoint *ep; - gpr_mu mu; - gpr_cv cv; int write_done; }; @@ -275,11 +260,11 @@ static void write_done(void *user_data /* write_socket_state */, grpc_endpoint_cb_status error) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); - gpr_mu_lock(&state->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - gpr_cv_signal(&state->cv); - gpr_mu_unlock(&state->mu); + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { @@ -294,6 +279,9 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); for (;;) { + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); do { bytes_read = read(fd, buf, bytes_left > read_size ? read_size : bytes_left); @@ -351,9 +339,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + grpc_endpoint_add_to_pollset(ep, &g_pollset); - gpr_mu_init(&state.mu); - gpr_cv_init(&state.cv); state.ep = ep; state.write_done = 0; @@ -366,19 +353,17 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { GPR_ASSERT(read_bytes == num_bytes); } else { drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(&state.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { if (state.write_done) { break; } - GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + grpc_pollset_work(&g_pollset, deadline); } - gpr_mu_unlock(&state.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } grpc_endpoint_destroy(ep); - gpr_mu_destroy(&state.mu); - gpr_cv_destroy(&state.cv); gpr_free(slices); } @@ -407,10 +392,9 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + grpc_endpoint_add_to_pollset(ep, &g_pollset); close(sv[0]); - gpr_mu_init(&state.mu); - gpr_cv_init(&state.cv); state.ep = ep; state.write_done = 0; @@ -423,20 +407,18 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { break; case GRPC_ENDPOINT_WRITE_PENDING: grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); - gpr_mu_lock(&state.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { if (state.write_done) { break; } - GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0); + grpc_pollset_work(&g_pollset, deadline); } - gpr_mu_unlock(&state.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); break; } grpc_endpoint_destroy(ep); - gpr_mu_destroy(&state.mu); - gpr_cv_destroy(&state.cv); free(slices); } @@ -475,6 +457,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( create_sockets(sv); f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); + grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); return f; } @@ -483,11 +467,17 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; +static void destroy_pollset(void *p) { + grpc_pollset_destroy(p); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + grpc_pollset_init(&g_pollset); run_tests(); - grpc_endpoint_tests(configs[0]); + grpc_endpoint_tests(configs[0], &g_pollset); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 328b19f68a..1e7af5a339 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -45,18 +45,17 @@ #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) -static gpr_mu mu; -static gpr_cv cv; -static int nconnects = 0; +static grpc_pollset g_pollset; +static int g_nconnects = 0; static void on_connect(void *arg, grpc_endpoint *tcp) { grpc_endpoint_shutdown(tcp); grpc_endpoint_destroy(tcp); - gpr_mu_lock(&mu); - nconnects++; - gpr_cv_broadcast(&cv); - gpr_mu_unlock(&mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + g_nconnects++; + grpc_pollset_kick(&g_pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void test_no_op(void) { @@ -106,12 +105,11 @@ static void test_connect(int n) { grpc_tcp_server *s = grpc_tcp_server_create(); int nconnects_before; gpr_timespec deadline; + grpc_pollset *pollsets[1]; int i; LOG_TEST("test_connect"); gpr_log(GPR_INFO, "clients=%d", n); - gpr_mu_lock(&mu); - memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len)); @@ -121,38 +119,43 @@ static void test_connect(int n) { GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); + pollsets[0] = &g_pollset; + grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL); + + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (i = 0; i < n; i++) { - deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); + deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(4000); - nconnects_before = nconnects; + nconnects_before = g_nconnects; clifd = socket(addr.ss_family, SOCK_STREAM, 0); GPR_ASSERT(clifd >= 0); + gpr_log(GPR_DEBUG, "start connect"); GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, addr_len) == 0); - while (nconnects == nconnects_before) { - GPR_ASSERT(gpr_cv_wait(&cv, &mu, deadline) == 0); + gpr_log(GPR_DEBUG, "wait"); + while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now()) > 0) { + grpc_pollset_work(&g_pollset, deadline); } + gpr_log(GPR_DEBUG, "wait done"); - GPR_ASSERT(nconnects == nconnects_before + 1); + GPR_ASSERT(g_nconnects == nconnects_before + 1); close(clifd); - - if (i != n - 1) { - sleep(1); - } } - gpr_mu_unlock(&mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_tcp_server_destroy(s, NULL, NULL); } +static void destroy_pollset(void *p) { + grpc_pollset_destroy(p); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_iomgr_init(); - gpr_mu_init(&mu); - gpr_cv_init(&cv); + grpc_pollset_init(&g_pollset); test_no_op(); test_no_op_with_start(); @@ -161,8 +164,7 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_iomgr_shutdown(); - gpr_mu_destroy(&mu); - gpr_cv_destroy(&cv); return 0; } |