aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r--src/core/iomgr/pollset_posix.c138
1 files changed, 62 insertions, 76 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 0fe3f80d44..885cb29234 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -136,17 +136,14 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
- pollset->idle_jobs = NULL;
- pollset->unlock_jobs = NULL;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
become_basic_pollset(pollset, NULL);
}
-void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- if (fd->workqueue->wakeup_read_fd != fd) {
- grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd);
- }
+void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
+ grpc_call_list *call_list) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd, 1);
+ pollset->vtable->add_fd(pollset, fd, 1, call_list);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -157,9 +154,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
+ grpc_call_list *call_list) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd, 1);
+ pollset->vtable->del_fd(pollset, fd, 1, call_list);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -170,53 +168,27 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-static void finish_shutdown(grpc_pollset *pollset) {
+static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) {
pollset->vtable->finish_shutdown(pollset);
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
-}
-
-static void run_jobs(grpc_pollset *pollset, grpc_closure **root) {
- grpc_closure *exec = *root;
- *root = NULL;
- gpr_mu_unlock(&pollset->mu);
- while (exec != NULL) {
- grpc_closure *next = exec->next;
- exec->cb(exec->cb_arg, 1);
- exec = next;
- }
- gpr_mu_lock(&pollset->mu);
-}
-
-static void add_job(grpc_closure **root, grpc_closure *closure) {
- closure->next = *root;
- *root = closure;
-}
-
-void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) {
- add_job(&pollset->idle_jobs, closure);
-}
-
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) {
- add_job(&pollset->unlock_jobs, closure);
+ grpc_call_list_add(call_list, pollset->shutdown_done, 1);
}
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
int added_worker = 0;
+ int locked = 1;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
- if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) {
- run_jobs(pollset, &pollset->idle_jobs);
+ if (!grpc_pollset_has_workers(pollset) &&
+ !grpc_call_list_empty(pollset->idle_jobs)) {
+ grpc_call_list_move(&pollset->idle_jobs, &call_list);
goto done;
}
- if (pollset->unlock_jobs != NULL) {
- run_jobs(pollset, &pollset->unlock_jobs);
- goto done;
- }
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
+ if (grpc_alarm_check(now, &deadline, &call_list)) {
goto done;
}
if (pollset->shutting_down) {
@@ -225,19 +197,32 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
if (pollset->in_flight_cbs) {
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
- gpr_mu_lock(&pollset->mu);
+ locked = 0;
goto done;
}
if (!pollset->kicked_without_pollers) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now,
+ NULL);
+ locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
} else {
pollset->kicked_without_pollers = 0;
}
done:
+ if (!grpc_call_list_empty(call_list)) {
+ if (locked) {
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ }
+ grpc_call_list_run(&call_list);
+ }
+ if (!locked) {
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (added_worker) {
remove_worker(pollset, worker);
@@ -248,7 +233,8 @@ done:
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
- finish_shutdown(pollset);
+ finish_shutdown(pollset, &call_list);
+ grpc_call_list_run(&call_list);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
@@ -258,9 +244,8 @@ done:
}
}
-void grpc_pollset_shutdown(grpc_pollset *pollset,
- void (*shutdown_done)(void *arg),
- void *shutdown_done_arg) {
+void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure,
+ grpc_call_list *call_list) {
int call_shutdown = 0;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
@@ -270,13 +255,12 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
pollset->called_shutdown = 1;
call_shutdown = 1;
}
- pollset->shutdown_done_cb = shutdown_done;
- pollset->shutdown_done_arg = shutdown_done_arg;
+ pollset->shutdown_done = closure;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
- finish_shutdown(pollset);
+ finish_shutdown(pollset, call_list);
}
}
@@ -317,12 +301,12 @@ typedef struct grpc_unary_promote_args {
grpc_closure promotion_closure;
} grpc_unary_promote_args;
-static void basic_do_promote(void *args, int success) {
+static void basic_do_promote(void *args, int success,
+ grpc_call_list *call_list) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
- int do_shutdown_cb = 0;
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@@ -349,19 +333,20 @@ static void basic_do_promote(void *args, int success) {
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
- do_shutdown_cb = 1;
+ grpc_call_list_add(call_list, pollset->shutdown_done, 1);
}
} else if (grpc_fd_is_orphaned(fd)) {
/* Don't try to add it to anything, we'll drop our ref on it below */
} else if (pollset->vtable != original_vtable) {
- pollset->vtable->add_fd(pollset, fd, 0);
+ pollset->vtable->add_fd(pollset, fd, 0, call_list);
} else if (fd != pollset->data.ptr) {
grpc_fd *fds[2];
fds[0] = pollset->data.ptr;
fds[1] = fd;
if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds),
+ call_list);
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -376,16 +361,15 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_unlock(&pollset->mu);
- if (do_shutdown_cb) {
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
- }
-
/* Matching ref in basic_pollset_add_fd */
GRPC_FD_UNREF(fd, "basicpoll_add");
+
+ grpc_call_list_run(call_list);
}
static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
grpc_unary_promote_args *up_args;
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
@@ -402,7 +386,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
pollset->data.ptr = fd;
GRPC_FD_REF(fd, "basicpoll");
} else if (!grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds),
+ call_list);
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -424,7 +409,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure);
+ grpc_call_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
@@ -434,7 +419,8 @@ exit:
}
static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
@@ -446,10 +432,11 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
}
-static void basic_pollset_maybe_work(grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline,
+ gpr_timespec now,
+ grpc_call_list *call_list) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@@ -487,7 +474,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
if (fd) {
grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN,
- pfd[1].revents & POLLOUT);
+ pfd[1].revents & POLLOUT, call_list);
}
if (r < 0) {
@@ -502,15 +489,13 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ grpc_fd_become_readable(fd, call_list);
}
if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ grpc_fd_become_writable(fd, call_list);
}
}
}
-
- gpr_mu_lock(&pollset->mu);
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@@ -521,8 +506,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd,
+ basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
+ basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;