aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epoll_linux.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epoll_linux.c')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 1b15e0eb4f..ac94d2e634 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -202,6 +202,8 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
+ grpc_closure_scheduler workqueue_scheduler;
+
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -305,6 +307,8 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -317,6 +321,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
+static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
+ workqueue_enqueue, workqueue_enqueue};
+
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -529,6 +536,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
+ pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
@@ -800,10 +808,10 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
@@ -820,6 +828,11 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("workqueue.enqueue", 0);
}
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ polling_island *pi = (polling_island *)workqueue;
+ return &pi->workqueue_scheduler;
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1030,8 +1043,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1057,16 +1069,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -1088,7 +1098,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -1359,7 +1369,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
@@ -1959,7 +1969,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};