aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-07 13:01:48 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-07 13:01:48 -0700
commit8fc1ca1e3f3fe3cfb6040405f764a150a95110ba (patch)
treefc330ee45d61cb61f8f10be69417dff502a7afb7 /src
parentef814ca29ae27a1c9b8193b185c82bd958b68778 (diff)
Initial pollset_set implementation
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c298
1 files changed, 267 insertions, 31 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index e846263a4b..6d79b82982 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -73,13 +73,30 @@
static grpc_wakeup_fd global_wakeup_fd;
/*******************************************************************************
- * Fd Declarations
+ * Pollset-set sibling link
*/
-#define FD_FROM_PO(po) ((grpc_fd *)(po))
+typedef enum {
+ PSS_FD,
+ PSS_POLLSET,
+ PSS_POLLSET_SET,
+ PSS_OBJ_TYPE_COUNT
+} pss_obj_type;
-struct grpc_fd {
+typedef struct pss_obj {
gpr_mu mu;
+ struct pss_obj *pss_next;
+ struct pss_obj *pss_prev;
+ int pss_refs;
+ grpc_pollset_set *pss_master;
+} pss_obj;
+
+/*******************************************************************************
+ * Fd Declarations
+ */
+
+struct grpc_fd {
+ pss_obj po;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -137,19 +154,31 @@ struct grpc_pollset_worker {
};
struct grpc_pollset {
- gpr_mu mu;
+ pss_obj po;
int epfd;
int num_pollers;
gpr_atm shutdown_atm;
grpc_closure *shutdown_closure;
grpc_wakeup_fd pollset_wakeup;
grpc_pollset_worker *root_worker;
+
+ grpc_pollset *pss_next;
+ grpc_pollset *pss_prev;
+ int pss_refs;
+ grpc_pollset_set *pss_master;
};
/*******************************************************************************
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ pss_obj po;
+ gpr_refcount refs;
+ grpc_pollset_set *master;
+
+ /* roots are only used if master == self */
+ pss_obj *roots[PSS_OBJ_TYPE_COUNT];
+};
/*******************************************************************************
* Common helpers
@@ -242,7 +271,7 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
- gpr_mu_destroy(&fd->mu);
+ gpr_mu_destroy(&fd->po.mu);
gpr_free(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
@@ -260,13 +289,13 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&new_fd->mu);
+ gpr_mu_init(&new_fd->po.mu);
}
- /* Note: It is not really needed to get the new_fd->mu lock here. If this
+ /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
* is a newly created fd (or an fd we got from the freelist), no one else
* would be holding a lock to it anyway. */
- gpr_mu_lock(&new_fd->mu);
+ gpr_mu_lock(&new_fd->po.mu);
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@@ -285,7 +314,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
- gpr_mu_unlock(&new_fd->mu);
+ gpr_mu_unlock(&new_fd->po.mu);
char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -299,11 +328,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) {
ret_fd = fd->fd;
}
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return ret_fd;
}
@@ -314,7 +343,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -338,7 +367,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
@@ -472,7 +501,7 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_worker);
}
-/* p->mu must be held before calling this function */
+/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
if (specific_worker == NULL) {
@@ -497,7 +526,7 @@ static grpc_error *kick_poller(void) {
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->mu);
+ gpr_mu_init(&pollset->po.mu);
pollset->epfd = epoll_create1(EPOLL_CLOEXEC);
if (pollset->epfd < 0) {
GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1"));
@@ -523,7 +552,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
}
}
pollset->root_worker = NULL;
- *mu = &pollset->mu;
+ *mu = &pollset->po.mu;
}
/* Convert a timespec to milliseconds:
@@ -588,7 +617,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_pollset *pollset) {
- gpr_mu_destroy(&pollset->mu);
+ gpr_mu_destroy(&pollset->po.mu);
if (pollset->epfd >= 0) close(pollset->epfd);
grpc_wakeup_fd_destroy(&pollset->pollset_wakeup);
}
@@ -669,7 +698,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (pollset->root_worker != worker) {
- if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline)) return false;
+ if (gpr_cv_wait(&worker->cv, &pollset->po.mu, deadline)) return false;
if (worker->kicked) return false;
}
}
@@ -698,8 +727,8 @@ static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
}
-/* pollset->mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->mu)
+/* pollset->po.mu lock must be held by the caller before calling this.
+ The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -710,10 +739,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (begin_worker(pollset, &worker, worker_hdl, deadline)) {
GPR_ASSERT(!pollset->shutdown_closure);
pollset->num_pollers++;
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&pollset->po.mu);
error = pollset_poll(exec_ctx, pollset, now, deadline);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&pollset->po.mu);
pollset->num_pollers--;
if (pollset->num_pollers == 0 && pollset->shutdown_closure != NULL) {
grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
@@ -758,45 +787,252 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
*/
static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
+ grpc_pollset_set *pss = gpr_zalloc(sizeof(*pss));
+ gpr_mu_init(&pss->po.mu);
+ pss->roots[PSS_POLLSET_SET] = &pss->po;
+ pss->po.pss_next = pss->po.pss_prev = &pss->po;
+ return pss;
+}
+
+static void pss_destroy(grpc_pollset_set *pss) {
+ gpr_mu_destroy(&pss->po.mu);
+ gpr_free(pss);
+}
+
+static grpc_pollset_set *pss_ref(grpc_pollset_set *pss) {
+ gpr_ref(&pss->refs);
return pss;
}
+static void pss_unref(grpc_pollset_set *pss) {
+ if (gpr_unref(&pss->refs)) pss_destroy(pss);
+}
+
static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss) {
- gpr_free(pss);
+ pss_unref(pss);
+}
+
+static grpc_pollset_set *pss_ref_and_lock_master(
+ grpc_pollset_set *master_or_slave) {
+ pss_ref(master_or_slave);
+ gpr_mu_lock(&master_or_slave->po.mu);
+ while (master_or_slave != master_or_slave->master) {
+ grpc_pollset_set *master = pss_ref(master_or_slave->master);
+ gpr_mu_unlock(&master_or_slave->po.mu);
+ pss_unref(master_or_slave);
+ master_or_slave = master;
+ gpr_mu_lock(&master_or_slave->po.mu);
+ }
+ return master_or_slave;
+}
+
+static void pss_broadcast_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *dst,
+ pss_obj *obj) {
+ grpc_fd *fd = (grpc_fd *)obj;
+ if (dst->roots[PSS_POLLSET] == NULL) return;
+ pss_obj *tgt = dst->roots[PSS_POLLSET];
+ do {
+ pollset_add_fd(exec_ctx, (grpc_pollset *)tgt, fd);
+ tgt = tgt->pss_next;
+ } while (tgt != dst->roots[PSS_POLLSET]);
+}
+
+static void pss_broadcast_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *dst, pss_obj *obj) {
+ grpc_pollset *pollset = (grpc_pollset *)obj;
+ if (dst->roots[PSS_FD] == NULL) return;
+ pss_obj *tgt = dst->roots[PSS_FD];
+ do {
+ pollset_add_fd(exec_ctx, pollset, (grpc_fd *)tgt);
+ tgt = tgt->pss_next;
+ } while (tgt != dst->roots[PSS_FD]);
+}
+
+static pss_obj *pss_splice(pss_obj *p, pss_obj *q) {
+ if (p == NULL) return q;
+ if (q == NULL) return p;
+ p->pss_next->pss_prev = q->pss_prev;
+ q->pss_prev->pss_next = p->pss_next;
+ p->pss_next = q;
+ q->pss_prev = p;
+ return p;
+}
+
+static void (*const broadcast[PSS_OBJ_TYPE_COUNT])(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *dst,
+ pss_obj *obj) = {
+ pss_broadcast_fd, pss_broadcast_pollset, NULL};
+
+static void pss_merge_broadcast_and_patch(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *a,
+ grpc_pollset_set *b,
+ pss_obj_type type) {
+ pss_obj *obj;
+ if (a->roots[type] != NULL) {
+ obj = a->roots[PSS_FD];
+ do {
+ broadcast[type](exec_ctx, b, obj);
+ obj = obj->pss_next;
+ } while (obj != a->roots[PSS_FD]);
+ }
+ if (b->roots[type] != NULL) {
+ obj = b->roots[PSS_FD];
+ do {
+ broadcast[type](exec_ctx, a, obj);
+ gpr_mu_lock(&obj->mu);
+ obj->pss_master = a;
+ gpr_mu_unlock(&obj->mu);
+ obj = obj->pss_next;
+ } while (obj != b->roots[PSS_FD]);
+ }
+ a->roots[type] = pss_splice(a->roots[type], b->roots[type]);
+}
+
+static void pss_merge(grpc_exec_ctx *exec_ctx, grpc_pollset_set *a,
+ grpc_pollset_set *b) {
+ pss_ref(a);
+ pss_ref(b);
+ bool changed;
+ for (;;) {
+ if (a == b) {
+ pss_unref(a);
+ pss_unref(b);
+ return;
+ } else if (a < b) {
+ gpr_mu_lock(&a->po.mu);
+ gpr_mu_lock(&b->po.mu);
+ } else {
+ gpr_mu_lock(&b->po.mu);
+ gpr_mu_lock(&a->po.mu);
+ }
+ changed = false;
+ if (a != a->master) {
+ grpc_pollset_set *master = pss_ref(a->master);
+ gpr_mu_unlock(&a->po.mu);
+ gpr_mu_unlock(&b->po.mu);
+ pss_unref(a);
+ a = master;
+ changed = true;
+ } else if (b != b->master) {
+ grpc_pollset_set *master = pss_ref(b->master);
+ gpr_mu_unlock(&a->po.mu);
+ gpr_mu_unlock(&b->po.mu);
+ pss_unref(b);
+ b = master;
+ changed = true;
+ } else {
+ /* a, b locked and are at their respective masters */
+ pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_FD);
+ pss_merge_broadcast_and_patch(exec_ctx, a, b, PSS_POLLSET);
+ b->po.pss_master = a;
+ gpr_mu_unlock(&a->po.mu);
+ gpr_mu_unlock(&b->po.mu);
+ }
+ }
+}
+
+static void pss_add_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ pss_obj *obj, pss_obj_type type) {
+ pss = pss_ref_and_lock_master(pss);
+ gpr_mu_lock(&obj->mu);
+ if (obj->pss_master == pss) {
+ /* obj is already a member -- just bump refcount */
+ obj->pss_refs++;
+ gpr_mu_unlock(&obj->mu);
+ gpr_mu_unlock(&pss->po.mu);
+ pss_unref(pss);
+ return;
+ } else if (obj->pss_master != NULL) {
+ grpc_pollset_set *other_pss = pss_ref(obj->pss_master);
+ obj->pss_refs++;
+ gpr_mu_unlock(&obj->mu);
+ gpr_mu_unlock(&pss->po.mu);
+ pss_merge(exec_ctx, pss, other_pss);
+ pss_unref(other_pss);
+ pss_unref(pss);
+ } else {
+ GPR_ASSERT(obj->pss_refs == 0);
+ obj->pss_refs = 1;
+ obj->pss_master = pss;
+ if (pss->roots[type] == NULL) {
+ pss->roots[type] = obj;
+ obj->pss_next = obj->pss_prev = obj;
+ } else {
+ obj->pss_next = pss->roots[type];
+ obj->pss_prev = obj->pss_next->pss_prev;
+ obj->pss_prev->pss_next = obj;
+ obj->pss_next->pss_prev = obj;
+ }
+ gpr_mu_unlock(&obj->mu);
+ switch (type) {
+ case PSS_FD:
+ pss_broadcast_fd(exec_ctx, pss, obj);
+ break;
+ case PSS_POLLSET:
+ pss_broadcast_pollset(exec_ctx, pss, obj);
+ break;
+ case PSS_POLLSET_SET:
+ case PSS_OBJ_TYPE_COUNT:
+ GPR_UNREACHABLE_CODE(break);
+ }
+ gpr_mu_unlock(&pss->po.mu);
+ pss_unref(pss);
+ }
+}
+
+static void pss_del_obj(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ pss_obj *obj, pss_obj_type type) {
+ pss = pss_ref_and_lock_master(pss);
+ gpr_mu_lock(&obj->mu);
+ obj->pss_refs--;
+ if (obj->pss_refs == 0) {
+ obj->pss_master = NULL;
+ if (obj == pss->roots[type]) {
+ pss->roots[type] = obj->pss_next;
+ }
+ if (obj->pss_next == obj) {
+ pss->roots[type] = NULL;
+ } else {
+ obj->pss_next->pss_prev = obj->pss_prev;
+ obj->pss_prev->pss_next = obj->pss_next;
+ }
+ }
+ gpr_mu_unlock(&obj->mu);
+ gpr_mu_unlock(&pss->po.mu);
+ pss_unref(pss);
}
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
- abort();
+ pss_add_obj(exec_ctx, pss, &fd->po, PSS_FD);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
- abort();
+ pss_del_obj(exec_ctx, pss, &fd->po, PSS_FD);
}
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
- abort();
+ pss_add_obj(exec_ctx, pss, &ps->po, PSS_POLLSET);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
- abort();
+ pss_del_obj(exec_ctx, pss, &ps->po, PSS_POLLSET);
}
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- abort();
+ pss_add_obj(exec_ctx, bag, &item->po, PSS_POLLSET_SET);
}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- abort();
+ pss_del_obj(exec_ctx, bag, &item->po, PSS_POLLSET_SET);
}
/*******************************************************************************