aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/fd_posix_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/fd_posix_test.c')
-rw-r--r--test/core/iomgr/fd_posix_test.c121
1 files changed, 73 insertions, 48 deletions
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index ae6b56da77..f89d6c7824 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -52,7 +52,6 @@
#include "test/core/util/test_config.h"
static grpc_pollset g_pollset;
-static grpc_workqueue *g_workqueue;
/* buffer size used to send and receive data.
1024 is the minimal value to set TCP send and receive buffer. */
@@ -119,18 +118,18 @@ typedef struct {
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
static void session_shutdown_cb(void *arg, /*session*/
- int success) {
+ int success, grpc_call_list *call_list) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, "a");
+ grpc_fd_orphan(se->em_fd, NULL, "a", call_list);
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_fd_shutdown(sv->em_fd);
+ grpc_fd_shutdown(sv->em_fd, call_list);
}
/* Called when data become readable in a session. */
static void session_read_cb(void *arg, /*session*/
- int success) {
+ int success, grpc_call_list *call_list) {
session *se = arg;
int fd = se->em_fd->fd;
@@ -138,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/
ssize_t read_total = 0;
if (!success) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(arg, 1, call_list);
return;
}
@@ -153,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(arg, 1, call_list);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -164,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@@ -174,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(void *arg /*server*/, int success) {
+static void listen_shutdown_cb(void *arg /*server*/, int success,
+ grpc_call_list *call_list) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, "b");
+ grpc_fd_orphan(sv->em_fd, NULL, "b", call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@@ -187,7 +187,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
/* Called when a new TCP connection request arrives in the listening port. */
static void listen_cb(void *arg, /*=sv_arg*/
- int success) {
+ int success, grpc_call_list *call_list) {
server *sv = arg;
int fd;
int flags;
@@ -197,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
grpc_fd *listen_em_fd = sv->em_fd;
if (!success) {
- listen_shutdown_cb(arg, 1);
+ listen_shutdown_cb(arg, 1, call_list);
return;
}
@@ -208,13 +208,13 @@ static void listen_cb(void *arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = gpr_malloc(sizeof(*se));
se->sv = sv;
- se->em_fd = grpc_fd_create(fd, g_workqueue, "listener");
- grpc_pollset_add_fd(&g_pollset, se->em_fd);
+ se->em_fd = grpc_fd_create(fd, "listener");
+ grpc_pollset_add_fd(&g_pollset, se->em_fd, call_list);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list);
- grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure, call_list);
}
/* Max number of connections pending to be accepted by listen(). */
@@ -224,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
listen_cb() is registered to be interested in reading from listen_fd.
When connection request arrives, listen_cb() is called to accept the
connection request. */
-static int server_start(server *sv) {
+static int server_start(server *sv, grpc_call_list *call_list) {
int port = 0;
int fd;
struct sockaddr_in sin;
@@ -237,12 +237,12 @@ static int server_start(server *sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- sv->em_fd = grpc_fd_create(fd, g_workqueue, "server");
- grpc_pollset_add_fd(&g_pollset, sv->em_fd);
+ sv->em_fd = grpc_fd_create(fd, "server");
+ grpc_pollset_add_fd(&g_pollset, sv->em_fd, call_list);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
- grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure, call_list);
return port;
}
@@ -251,9 +251,13 @@ static int server_start(server *sv) {
static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -286,23 +290,24 @@ static void client_init(client *cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(void *arg /*client*/, int success) {
+static void client_session_shutdown_cb(void *arg /*client*/, int success,
+ grpc_call_list *call_list) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, "c");
+ grpc_fd_orphan(cl->em_fd, NULL, "c", call_list);
cl->done = 1;
grpc_pollset_kick(&g_pollset, NULL);
}
/* Write as much as possible, then register notify_on_write. */
static void client_session_write(void *arg, /*client*/
- int success) {
+ int success, grpc_call_list *call_list) {
client *cl = arg;
int fd = cl->em_fd->fd;
ssize_t write_once = 0;
if (!success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(arg, 1, call_list);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
return;
}
@@ -317,10 +322,10 @@ static void client_session_write(void *arg, /*client*/
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
- grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
+ grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure, call_list);
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(arg, 1, call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
@@ -330,7 +335,7 @@ static void client_session_write(void *arg, /*client*/
}
/* Start a client to send a stream of bytes. */
-static void client_start(client *cl, int port) {
+static void client_start(client *cl, int port, grpc_call_list *call_list) {
int fd;
struct sockaddr_in sin;
create_test_socket(port, &fd, &sin);
@@ -350,10 +355,10 @@ static void client_start(client *cl, int port) {
}
}
- cl->em_fd = grpc_fd_create(fd, g_workqueue, "client");
- grpc_pollset_add_fd(&g_pollset, cl->em_fd);
+ cl->em_fd = grpc_fd_create(fd, "client");
+ grpc_pollset_add_fd(&g_pollset, cl->em_fd, call_list);
- client_session_write(cl, 1);
+ client_session_write(cl, 1, call_list);
}
/* Wait for the signal to shutdown a client. */
@@ -361,8 +366,12 @@ static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) {
grpc_pollset_worker worker;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -374,11 +383,13 @@ static void test_grpc_fd(void) {
server sv;
client cl;
int port;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
server_init(&sv);
- port = server_start(&sv);
+ port = server_start(&sv, &call_list);
client_init(&cl);
- client_start(&cl, port);
+ client_start(&cl, port, &call_list);
+ grpc_call_list_run(&call_list);
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
@@ -386,14 +397,15 @@ static void test_grpc_fd(void) {
}
typedef struct fd_change_data {
- void (*cb_that_ran)(void *, int success);
+ void (*cb_that_ran)(void *, int success, grpc_call_list *call_list);
} fd_change_data;
void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
void destroy_change_data(fd_change_data *fdc) {}
-static void first_read_callback(void *arg /* fd_change_data */, int success) {
+static void first_read_callback(void *arg /* fd_change_data */, int success,
+ grpc_call_list *call_list) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -402,7 +414,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-static void second_read_callback(void *arg /* fd_change_data */, int success) {
+static void second_read_callback(void *arg /* fd_change_data */, int success,
+ grpc_call_list *call_list) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -424,6 +437,7 @@ static void test_grpc_fd_change(void) {
ssize_t result;
grpc_closure first_closure;
grpc_closure second_closure;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;
@@ -439,11 +453,11 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- em_fd = grpc_fd_create(sv[0], g_workqueue, "test_grpc_fd_change");
- grpc_pollset_add_fd(&g_pollset, em_fd);
+ em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
+ grpc_pollset_add_fd(&g_pollset, em_fd, &call_list);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, &first_closure);
+ grpc_fd_notify_on_read(em_fd, &first_closure, &call_list);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -453,7 +467,10 @@ static void test_grpc_fd_change(void) {
while (a.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -464,7 +481,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(em_fd, &second_closure);
+ grpc_fd_notify_on_read(em_fd, &second_closure, &call_list);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -473,29 +490,37 @@ static void test_grpc_fd_change(void) {
while (b.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(em_fd, NULL, "d");
+ grpc_fd_orphan(em_fd, NULL, "d", &call_list);
+ grpc_call_list_run(&call_list);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_iomgr_init();
grpc_pollset_init(&g_pollset);
- g_workqueue = grpc_workqueue_create();
test_grpc_fd();
test_grpc_fd_change();
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_iomgr_shutdown();
return 0;
}