aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-02 15:29:18 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-02 15:29:18 -0700
commit23adbd5a815026fc4543a3ccdb57a8871939f017 (patch)
treeeb5c0b3ee79010008fb3013aab04fe5699910962
parent249de2b5c0113dde23d0667cbab7208875b4b0c1 (diff)
Finish off epollex refactoring (no testing yet)
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c494
1 files changed, 319 insertions, 175 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5317c6bff6..e456e108c5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -135,6 +135,13 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
+typedef struct {
+ grpc_pollset_worker *next;
+ grpc_pollset_worker *prev;
+} pwlink;
+
+typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks;
+
struct grpc_pollset_worker {
bool kicked;
bool initialized_cv;
@@ -142,8 +149,7 @@ struct grpc_pollset_worker {
grpc_pollset *pollset;
pollable *pollable_obj;
- grpc_pollset_worker *next;
- grpc_pollset_worker *prev;
+ pwlink links[PWLINK_COUNT];
};
#define MAX_EPOLL_EVENTS 100
@@ -154,7 +160,7 @@ struct grpc_pollset {
pollable *active_pollable;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
- int worker_count;
+ grpc_pollset_worker *root_worker;
int event_cursor;
int event_count;
@@ -164,13 +170,19 @@ struct grpc_pollset {
/*******************************************************************************
* Pollset-set Declarations
*/
+
struct grpc_pollset_set {
gpr_refcount refs;
gpr_mu mu;
grpc_pollset_set *parent;
- // only valid if parent==NULL
- pollable *child_pollsets;
- grpc_fd *child_fds;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ pollable **pollsets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
};
/*******************************************************************************
@@ -483,64 +495,52 @@ static void pollset_global_shutdown(void) {
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
- if (pollset->shutdown_closure != NULL && pollset->worker_count == 0) {
+ if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
-#if 0
-static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_unused) {
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_pollset *pollset = (grpc_pollset *)arg;
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- if (pollset->root_worker != NULL) {
- grpc_pollset_worker *worker = pollset->root_worker;
- do {
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&worker->pollable_obj->po.mu);
- }
- if (worker->initialized_cv && worker != pollset->root_worker) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- worker->kicked = true;
- gpr_cv_signal(&worker->cv);
- } else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- append_error(&error,
- grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
- "pollset_shutdown");
- }
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker->pollable_obj->po.mu);
- }
-
- worker = worker->links[PWL_POLLSET].next;
- } while (worker != pollset->root_worker);
+/* both pollset->active_pollable->mu, pollset->mu must be held before calling
+ * this function */
+static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker) {
+ pollable *p = pollset->active_pollable;
+ if (specific_worker->kicked) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
+ }
+ return GRPC_ERROR_NONE;
+ } else if (gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
+ }
+ specific_worker->kicked = true;
+ return GRPC_ERROR_NONE;
+ } else if (specific_worker == p->root_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
+ }
+ specific_worker->kicked = true;
+ return grpc_wakeup_fd_wakeup(&p->wakeup);
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
+ }
+ specific_worker->kicked = true;
+ gpr_cv_signal(&specific_worker->cv);
+ return GRPC_ERROR_NONE;
}
- pollset->kick_alls_pending--;
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- GRPC_LOG_IF_ERROR("kick_all", error);
-}
-#endif
-
-static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- abort();
}
-#if 0
-static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
+/* both pollset->active_pollable->mu, pollset->mu must be held before calling
+ * this function */
+static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ pollable *p = pollset->active_pollable;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p kick %p tls_pollset=%p tls_worker=%p "
@@ -558,12 +558,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
pollset->kicked_without_poller = true;
return GRPC_ERROR_NONE;
} else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p);
- }
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
+ return pollset_kick_one(exec_ctx, pollset, pollset->root_worker);
}
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -571,53 +566,32 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
}
return GRPC_ERROR_NONE;
}
- } else if (specific_worker->kicked) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
- }
- return GRPC_ERROR_NONE;
- } else if (gpr_tls_get(&g_current_thread_worker) ==
- (intptr_t)specific_worker) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
- }
- specific_worker->kicked = true;
- return GRPC_ERROR_NONE;
- } else if (specific_worker == p->root_worker) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
- }
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
- specific_worker->kicked = true;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
} else {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
- }
- specific_worker->kicked = true;
- gpr_cv_signal(&specific_worker->cv);
- return GRPC_ERROR_NONE;
+ return pollset_kick_one(exec_ctx, pollset, specific_worker);
}
}
-#endif
-/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- abort();
-#if 0
- pollable *p = pollset->current_pollable_obj;
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (p != &pollset->pollable_obj) {
- gpr_mu_lock(&p->po.mu);
- }
- grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
- if (p != &pollset->pollable_obj) {
- gpr_mu_unlock(&p->po.mu);
+ pollable *p = pollset->active_pollable;
+ gpr_mu_lock(&p->mu);
+ grpc_error *error = pollset_kick_inner(exec_ctx, pollset, specific_worker);
+ gpr_mu_unlock(&p->mu);
+ return error;
+}
+
+static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ pollable *p = pollset->active_pollable;
+ grpc_error *error = GRPC_ERROR_NONE;
+ const char *err_desc = "pollset_kick_all";
+ gpr_mu_lock(&p->mu);
+ for (grpc_pollset_worker *w = pollset->root_worker; w != NULL;
+ w = w->links[PWLINK_POLLSET].next) {
+ append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc);
}
+ gpr_mu_unlock(&p->mu);
return error;
-#endif
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -701,7 +675,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
pollset->shutdown_closure = closure;
- pollset_kick_all(exec_ctx, pollset);
+ GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -790,37 +764,41 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
/* Return true if first in list */
-static bool worker_insert(pollable *pollable_obj, grpc_pollset_worker *worker) {
- if (pollable_obj->root_worker == NULL) {
- pollable_obj->root_worker = worker;
- worker->next = worker->prev = worker;
+static bool worker_insert(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker, pwlinks link) {
+ if (*root_worker == NULL) {
+ *root_worker = worker;
+ worker->links[link].next = worker->links[link].prev = worker;
return true;
} else {
- worker->next = pollable_obj->root_worker;
- worker->prev = worker->next->prev;
- worker->next->prev = worker;
- worker->prev->next = worker;
+ worker->links[link].next = *root_worker;
+ worker->links[link].prev = worker->links[link].next->links[link].prev;
+ worker->links[link].next->links[link].prev = worker;
+ worker->links[link].prev->links[link].next = worker;
return false;
}
}
/* returns the new root IFF the root changed */
-static grpc_pollset_worker *worker_remove(pollable *pollable_obj,
- grpc_pollset_worker *worker) {
- if (worker == pollable_obj->root_worker) {
- if (worker == worker->next) {
- pollable_obj->root_worker = NULL;
- return NULL;
+typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result;
+
+static worker_remove_result worker_remove(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker,
+ pwlinks link) {
+ if (worker == *root_worker) {
+ if (worker == worker->links[link].next) {
+ *root_worker = NULL;
+ return WRR_EMPTIED;
} else {
- pollable_obj->root_worker = worker->next;
- worker->prev->next = worker->next;
- worker->next->prev = worker->prev;
- return pollable_obj->root_worker;
+ *root_worker = worker->links[link].next;
+ worker->links[link].prev->links[link].next = worker->links[link].next;
+ worker->links[link].next->links[link].prev = worker->links[link].prev;
+ return WRR_NEW_ROOT;
}
} else {
- worker->prev->next = worker->next;
- worker->next->prev = worker->prev;
- return NULL;
+ worker->links[link].prev->links[link].next = worker->links[link].next;
+ worker->links[link].next->links[link].prev = worker->links[link].prev;
+ return WRR_REMOVED;
}
}
@@ -834,9 +812,10 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->kicked = false;
worker->pollset = pollset;
worker->pollable_obj = pollable_ref(pollset->active_pollable);
+ worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
gpr_mu_lock(&worker->pollable_obj->mu);
- pollset->worker_count++;
- if (!worker_insert(worker->pollable_obj, worker)) {
+ if (!worker_insert(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
if (GRPC_TRACER_ON(grpc_polling_trace) &&
@@ -876,8 +855,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
gpr_mu_lock(&worker->pollable_obj->mu);
- grpc_pollset_worker *new_root = worker_remove(worker->pollable_obj, worker);
- if (new_root != NULL) {
+ if (worker_remove(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE) == WRR_NEW_ROOT) {
+ grpc_pollset_worker *new_root = worker->pollable_obj->root_worker;
GPR_ASSERT(new_root->initialized_cv);
gpr_cv_signal(&new_root->cv);
}
@@ -885,8 +865,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_cv_destroy(&worker->cv);
}
gpr_mu_unlock(&worker->pollable_obj->mu);
- pollset->worker_count--;
- if (pollset->worker_count == 0) {
+ if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
}
@@ -932,48 +911,64 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error;
}
+static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd",
+ pollset, fd);
+ }
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ pollable_unref(pollset->active_pollable);
+ append_error(&error, fd_become_pollable(fd, &pollset->active_pollable),
+ err_desc);
+ return error;
+}
+
+static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p add fd %p; transition pollable from fd %p to multipoller",
+ pollset, and_add_fd, pollset->active_pollable->owner_fd);
+ }
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ pollable_unref(pollset->active_pollable);
+ grpc_fd *initial_fd = pollset->active_pollable->owner_fd;
+ if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable),
+ err_desc)) {
+ append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd),
+ err_desc);
+ if (and_add_fd != NULL) {
+ append_error(&error,
+ pollable_add_fd(pollset->active_pollable, and_add_fd),
+ err_desc);
+ }
+ }
+ return error;
+}
+
/* expects pollsets locked, flag whether fd is locked or not */
static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, grpc_fd *fd) {
- static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
pollable *po_at_start = pollable_ref(pollset->active_pollable);
switch (pollset->active_pollable->type) {
case PO_EMPTY:
/* empty pollable --> single fd pollable */
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from empty to fd",
- pollset, fd);
- }
- pollset_kick_all(exec_ctx, pollset);
- pollable_unref(pollset->active_pollable);
- append_error(&error, fd_become_pollable(fd, &pollset->active_pollable),
- err_desc);
+ error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx,
+ pollset, fd);
break;
case PO_FD:
/* fd --> multipoller */
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(
- GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from fd %p to multipoller",
- pollset, fd, pollset->active_pollable->owner_fd);
- }
- pollset_kick_all(exec_ctx, pollset);
- pollable_unref(pollset->active_pollable);
- if (append_error(&error,
- pollable_create(PO_MULTI, &pollset->active_pollable),
- err_desc)) {
- append_error(&error, pollable_add_fd(pollset->active_pollable,
- po_at_start->owner_fd),
- err_desc);
- append_error(&error, pollable_add_fd(pollset->active_pollable, fd),
- err_desc);
- }
+ error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx,
+ pollset, fd);
break;
case PO_MULTI:
- append_error(&error, pollable_add_fd(pollset->active_pollable, fd),
- err_desc);
+ error = pollable_add_fd(pollset->active_pollable, fd);
break;
}
if (error != GRPC_ERROR_NONE) {
@@ -985,6 +980,34 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
return error;
}
+static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ pollable **pollable_obj) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ gpr_mu_lock(&pollset->mu);
+ pollable *po_at_start = pollable_ref(pollset->active_pollable);
+ switch (pollset->active_pollable->type) {
+ case PO_EMPTY:
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ break;
+ case PO_FD:
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, NULL);
+ break;
+ case PO_MULTI:
+ break;
+ }
+ if (error != GRPC_ERROR_NONE) {
+ pollable_unref(pollset->active_pollable);
+ pollset->active_pollable = po_at_start;
+ } else {
+ *pollable_obj = pollable_ref(pollset->active_pollable);
+ pollable_unref(po_at_start);
+ }
+ gpr_mu_unlock(&pollset->mu);
+ return error;
+}
+
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
@@ -1008,12 +1031,9 @@ static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) {
}
static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pss = (grpc_pollset_set *)gpr_malloc(sizeof(*pss));
+ grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss));
gpr_mu_init(&pss->mu);
gpr_ref_init(&pss->refs, 1);
- pss->parent = NULL;
- pss->child_pollsets = NULL;
- pss->child_fds = NULL;
return pss;
}
@@ -1025,32 +1045,156 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_set_add_fd";
pss = pss_lock_adam(pss);
- pollable *p = pss->child_pollsets;
- if (p != NULL) {
- do {
- append_error(&error, pollable_add_fd(p, fd), err_desc);
- p = p->next;
- } while (p != pss->child_pollsets);
-
- } else {
+ for (size_t i = 0; i < pss->pollset_count; i++) {
+ append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc);
}
+ if (pss->fd_count == pss->fd_capacity) {
+ pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
+ pss->fds = gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds));
+ }
+ REF_BY(fd, 2, "pollset_set");
+ pss->fds[pss->fd_count++] = fd;
gpr_mu_unlock(&pss->mu);
- GRPC_LOG_IF_ERROR("pollset_set_add_fd", error);
+ GRPC_LOG_IF_ERROR(err_desc, error);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {}
+ grpc_fd *fd) {
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->fd_count; i++) {
+ if (pss->fds[i] == fd) {
+ UNREF_BY(exec_ctx, fd, 2, "pollset_set");
+ break;
+ }
+ }
+ GPR_ASSERT(i != pss->fd_count);
+ for (; i < pss->fd_count - 1; i++) {
+ pss->fds[i] = pss->fds[i + 1];
+ }
+ pss->fd_count--;
+ gpr_mu_unlock(&pss->mu);
+}
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {}
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_pollset";
+ pollable *pollable_obj;
+ if (!GRPC_LOG_IF_ERROR(
+ err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) {
+ return;
+ }
+ pss = pss_lock_adam(pss);
+ for (size_t i = 0; i < pss->fd_count; i++) {
+ append_error(&error, pollable_add_fd(pollable_obj, pss->fds[i]), err_desc);
+ }
+ if (pss->pollset_count == pss->pollset_capacity) {
+ pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
+ pss->pollsets = gpr_realloc(pss->pollsets,
+ pss->pollset_capacity * sizeof(*pss->pollsets));
+ }
+ pss->pollsets[pss->pollset_count++] = pollable_obj;
+ gpr_mu_unlock(&pss->mu);
+
+ GRPC_LOG_IF_ERROR(err_desc, error);
+}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {}
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->pollset_count; i++) {
+ if (pss->pollsets[i] == ps->active_pollable) {
+ pollable_unref(pss->pollsets[i]);
+ break;
+ }
+ }
+ GPR_ASSERT(i != pss->pollset_count);
+ for (; i < pss->pollset_count - 1; i++) {
+ pss->pollsets[i] = pss->pollsets[i + 1];
+ }
+ pss->pollset_count--;
+ gpr_mu_unlock(&pss->mu);
+}
+
+static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
+ size_t fd_count, pollable **pollables,
+ size_t pollable_count,
+ const char *err_desc) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ for (size_t i = 0; i < fd_count; i++) {
+ for (size_t j = 0; j < pollable_count; j++) {
+ append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc);
+ }
+ }
+ return error;
+}
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {}
+ grpc_pollset_set *a,
+ grpc_pollset_set *b) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_fd";
+ for (;;) {
+ if (a == b) {
+ // pollset ancestors are the same: nothing to do
+ return;
+ }
+ if (a > b) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ gpr_mu_lock(&a->mu);
+ gpr_mu_lock(&b->mu);
+ if (a->parent != NULL) {
+ a = a->parent;
+ } else if (b->parent != NULL) {
+ b = b->parent;
+ } else {
+ break; // exit loop, both pollsets locked
+ }
+ gpr_mu_unlock(&a->mu);
+ gpr_mu_unlock(&b->mu);
+ }
+ // try to do the least copying possible
+ // TODO(ctiller): there's probably a better heuristic here
+ const size_t a_size = a->fd_count + a->pollset_count;
+ const size_t b_size = b->fd_count + b->pollset_count;
+ if (b_size > a_size) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ gpr_ref(&a->refs);
+ b->parent = a;
+ append_error(&error,
+ add_fds_to_pollables(exec_ctx, a->fds, a->fd_count, b->pollsets,
+ b->pollset_count, "merge_a2b"),
+ err_desc);
+ append_error(&error,
+ add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, a->pollsets,
+ a->pollset_count, "merge_b2a"),
+ err_desc);
+ if (a->fd_capacity < a->fd_count + b->fd_count) {
+ a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
+ a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
+ }
+ if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
+ a->pollset_capacity =
+ GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
+ a->pollsets =
+ gpr_realloc(a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
+ }
+ memcpy(a->fds + a->fd_count, b->fds, b->fd_count * sizeof(*b->fds));
+ memcpy(a->pollsets + a->pollset_count, b->pollsets,
+ b->pollset_count * sizeof(*b->pollsets));
+ a->fd_count += b->fd_count;
+ a->pollset_count += b->pollset_count;
+ gpr_free(b->fds);
+ gpr_free(b->pollsets);
+ b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0;
+ gpr_mu_unlock(&a->mu);
+ gpr_mu_unlock(&b->mu);
+}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,