diff options
author | 2017-10-02 15:29:18 -0700 | |
---|---|---|
committer | 2017-10-02 15:29:18 -0700 | |
commit | 23adbd5a815026fc4543a3ccdb57a8871939f017 (patch) | |
tree | eb5c0b3ee79010008fb3013aab04fe5699910962 | |
parent | 249de2b5c0113dde23d0667cbab7208875b4b0c1 (diff) |
Finish off epollex refactoring (no testing yet)
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 494 |
1 files changed, 319 insertions, 175 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5317c6bff6..e456e108c5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -135,6 +135,13 @@ static void fd_global_shutdown(void); * Pollset Declarations */ +typedef struct { + grpc_pollset_worker *next; + grpc_pollset_worker *prev; +} pwlink; + +typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; + struct grpc_pollset_worker { bool kicked; bool initialized_cv; @@ -142,8 +149,7 @@ struct grpc_pollset_worker { grpc_pollset *pollset; pollable *pollable_obj; - grpc_pollset_worker *next; - grpc_pollset_worker *prev; + pwlink links[PWLINK_COUNT]; }; #define MAX_EPOLL_EVENTS 100 @@ -154,7 +160,7 @@ struct grpc_pollset { pollable *active_pollable; bool kicked_without_poller; grpc_closure *shutdown_closure; - int worker_count; + grpc_pollset_worker *root_worker; int event_cursor; int event_count; @@ -164,13 +170,19 @@ struct grpc_pollset { /******************************************************************************* * Pollset-set Declarations */ + struct grpc_pollset_set { gpr_refcount refs; gpr_mu mu; grpc_pollset_set *parent; - // only valid if parent==NULL - pollable *child_pollsets; - grpc_fd *child_fds; + + size_t pollset_count; + size_t pollset_capacity; + pollable **pollsets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; }; /******************************************************************************* @@ -483,64 +495,52 @@ static void pollset_global_shutdown(void) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->worker_count == 0) { + if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } -#if 0 -static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_unused) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (pollset->root_worker != NULL) { - grpc_pollset_worker *worker = pollset->root_worker; - do { - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker->pollable_obj->po.mu); - } - if (worker->initialized_cv && worker != pollset->root_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - worker->kicked = true; - gpr_cv_signal(&worker->cv); - } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - append_error(&error, - grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), - "pollset_shutdown"); - } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - } - - worker = worker->links[PWL_POLLSET].next; - } while (worker != pollset->root_worker); +/* both pollset->active_pollable->mu, pollset->mu must be held before calling + * this function */ +static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + pollable *p = pollset->active_pollable; + if (specific_worker->kicked) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); + } + return GRPC_ERROR_NONE; + } else if (gpr_tls_get(&g_current_thread_worker) == + (intptr_t)specific_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); + } + specific_worker->kicked = true; + return GRPC_ERROR_NONE; + } else if (specific_worker == p->root_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); + } + specific_worker->kicked = true; + return grpc_wakeup_fd_wakeup(&p->wakeup); + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); + } + specific_worker->kicked = true; + gpr_cv_signal(&specific_worker->cv); + return GRPC_ERROR_NONE; } - pollset->kick_alls_pending--; - pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - GRPC_LOG_IF_ERROR("kick_all", error); -} -#endif - -static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - abort(); } -#if 0 -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, +/* both pollset->active_pollable->mu, pollset->mu must be held before calling + * this function */ +static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + pollable *p = pollset->active_pollable; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p " @@ -558,12 +558,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, pollset->kicked_without_poller = true; return GRPC_ERROR_NONE; } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); - } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; - return grpc_wakeup_fd_wakeup(&p->wakeup); + return pollset_kick_one(exec_ctx, pollset, pollset->root_worker); } } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -571,53 +566,32 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, } return GRPC_ERROR_NONE; } - } else if (specific_worker->kicked) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); - } - return GRPC_ERROR_NONE; - } else if (gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); - } - specific_worker->kicked = true; - return GRPC_ERROR_NONE; - } else if (specific_worker == p->root_worker) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); - } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; - specific_worker->kicked = true; - return grpc_wakeup_fd_wakeup(&p->wakeup); } else { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); - } - specific_worker->kicked = true; - gpr_cv_signal(&specific_worker->cv); - return GRPC_ERROR_NONE; + return pollset_kick_one(exec_ctx, pollset, specific_worker); } } -#endif -/* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - abort(); -#if 0 - pollable *p = pollset->current_pollable_obj; - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable_obj) { - gpr_mu_lock(&p->po.mu); - } - grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable_obj) { - gpr_mu_unlock(&p->po.mu); + pollable *p = pollset->active_pollable; + gpr_mu_lock(&p->mu); + grpc_error *error = pollset_kick_inner(exec_ctx, pollset, specific_worker); + gpr_mu_unlock(&p->mu); + return error; +} + +static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + pollable *p = pollset->active_pollable; + grpc_error *error = GRPC_ERROR_NONE; + const char *err_desc = "pollset_kick_all"; + gpr_mu_lock(&p->mu); + for (grpc_pollset_worker *w = pollset->root_worker; w != NULL; + w = w->links[PWLINK_POLLSET].next) { + append_error(&error, pollset_kick_one(exec_ctx, pollset, w), err_desc); } + gpr_mu_unlock(&p->mu); return error; -#endif } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { @@ -701,7 +675,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - pollset_kick_all(exec_ctx, pollset); + GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -790,37 +764,41 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } /* Return true if first in list */ -static bool worker_insert(pollable *pollable_obj, grpc_pollset_worker *worker) { - if (pollable_obj->root_worker == NULL) { - pollable_obj->root_worker = worker; - worker->next = worker->prev = worker; +static bool worker_insert(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, pwlinks link) { + if (*root_worker == NULL) { + *root_worker = worker; + worker->links[link].next = worker->links[link].prev = worker; return true; } else { - worker->next = pollable_obj->root_worker; - worker->prev = worker->next->prev; - worker->next->prev = worker; - worker->prev->next = worker; + worker->links[link].next = *root_worker; + worker->links[link].prev = worker->links[link].next->links[link].prev; + worker->links[link].next->links[link].prev = worker; + worker->links[link].prev->links[link].next = worker; return false; } } /* returns the new root IFF the root changed */ -static grpc_pollset_worker *worker_remove(pollable *pollable_obj, - grpc_pollset_worker *worker) { - if (worker == pollable_obj->root_worker) { - if (worker == worker->next) { - pollable_obj->root_worker = NULL; - return NULL; +typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result; + +static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, + pwlinks link) { + if (worker == *root_worker) { + if (worker == worker->links[link].next) { + *root_worker = NULL; + return WRR_EMPTIED; } else { - pollable_obj->root_worker = worker->next; - worker->prev->next = worker->next; - worker->next->prev = worker->prev; - return pollable_obj->root_worker; + *root_worker = worker->links[link].next; + worker->links[link].prev->links[link].next = worker->links[link].next; + worker->links[link].next->links[link].prev = worker->links[link].prev; + return WRR_NEW_ROOT; } } else { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; - return NULL; + worker->links[link].prev->links[link].next = worker->links[link].next; + worker->links[link].next->links[link].prev = worker->links[link].prev; + return WRR_REMOVED; } } @@ -834,9 +812,10 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->kicked = false; worker->pollset = pollset; worker->pollable_obj = pollable_ref(pollset->active_pollable); + worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET); gpr_mu_lock(&worker->pollable_obj->mu); - pollset->worker_count++; - if (!worker_insert(worker->pollable_obj, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); if (GRPC_TRACER_ON(grpc_polling_trace) && @@ -876,8 +855,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { gpr_mu_lock(&worker->pollable_obj->mu); - grpc_pollset_worker *new_root = worker_remove(worker->pollable_obj, worker); - if (new_root != NULL) { + if (worker_remove(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE) == WRR_NEW_ROOT) { + grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; GPR_ASSERT(new_root->initialized_cv); gpr_cv_signal(&new_root->cv); } @@ -885,8 +865,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_cv_destroy(&worker->cv); } gpr_mu_unlock(&worker->pollable_obj->mu); - pollset->worker_count--; - if (pollset->worker_count == 0) { + if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); } } @@ -932,48 +911,64 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } +static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", + pollset, fd); + } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + pollable_unref(pollset->active_pollable); + append_error(&error, fd_become_pollable(fd, &pollset->active_pollable), + err_desc); + return error; +} + +static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) { + static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; + grpc_error *error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p add fd %p; transition pollable from fd %p to multipoller", + pollset, and_add_fd, pollset->active_pollable->owner_fd); + } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + pollable_unref(pollset->active_pollable); + grpc_fd *initial_fd = pollset->active_pollable->owner_fd; + if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable), + err_desc)) { + append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd), + err_desc); + if (and_add_fd != NULL) { + append_error(&error, + pollable_add_fd(pollset->active_pollable, and_add_fd), + err_desc); + } + } + return error; +} + /* expects pollsets locked, flag whether fd is locked or not */ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; pollable *po_at_start = pollable_ref(pollset->active_pollable); switch (pollset->active_pollable->type) { case PO_EMPTY: /* empty pollable --> single fd pollable */ - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from empty to fd", - pollset, fd); - } - pollset_kick_all(exec_ctx, pollset); - pollable_unref(pollset->active_pollable); - append_error(&error, fd_become_pollable(fd, &pollset->active_pollable), - err_desc); + error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, + pollset, fd); break; case PO_FD: /* fd --> multipoller */ - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log( - GPR_DEBUG, - "PS:%p add fd %p; transition pollable from fd %p to multipoller", - pollset, fd, pollset->active_pollable->owner_fd); - } - pollset_kick_all(exec_ctx, pollset); - pollable_unref(pollset->active_pollable); - if (append_error(&error, - pollable_create(PO_MULTI, &pollset->active_pollable), - err_desc)) { - append_error(&error, pollable_add_fd(pollset->active_pollable, - po_at_start->owner_fd), - err_desc); - append_error(&error, pollable_add_fd(pollset->active_pollable, fd), - err_desc); - } + error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx, + pollset, fd); break; case PO_MULTI: - append_error(&error, pollable_add_fd(pollset->active_pollable, fd), - err_desc); + error = pollable_add_fd(pollset->active_pollable, fd); break; } if (error != GRPC_ERROR_NONE) { @@ -985,6 +980,34 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, return error; } +static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable **pollable_obj) { + grpc_error *error = GRPC_ERROR_NONE; + gpr_mu_lock(&pollset->mu); + pollable *po_at_start = pollable_ref(pollset->active_pollable); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + error = pollable_create(PO_MULTI, &pollset->active_pollable); + break; + case PO_FD: + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, NULL); + break; + case PO_MULTI: + break; + } + if (error != GRPC_ERROR_NONE) { + pollable_unref(pollset->active_pollable); + pollset->active_pollable = po_at_start; + } else { + *pollable_obj = pollable_ref(pollset->active_pollable); + pollable_unref(po_at_start); + } + gpr_mu_unlock(&pollset->mu); + return error; +} + static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); @@ -1008,12 +1031,9 @@ static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { } static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pss = (grpc_pollset_set *)gpr_malloc(sizeof(*pss)); + grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); gpr_mu_init(&pss->mu); gpr_ref_init(&pss->refs, 1); - pss->parent = NULL; - pss->child_pollsets = NULL; - pss->child_fds = NULL; return pss; } @@ -1025,32 +1045,156 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); - pollable *p = pss->child_pollsets; - if (p != NULL) { - do { - append_error(&error, pollable_add_fd(p, fd), err_desc); - p = p->next; - } while (p != pss->child_pollsets); - - } else { + for (size_t i = 0; i < pss->pollset_count; i++) { + append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc); } + if (pss->fd_count == pss->fd_capacity) { + pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); + pss->fds = gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); + } + REF_BY(fd, 2, "pollset_set"); + pss->fds[pss->fd_count++] = fd; gpr_mu_unlock(&pss->mu); - GRPC_LOG_IF_ERROR("pollset_set_add_fd", error); + GRPC_LOG_IF_ERROR(err_desc, error); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) {} + grpc_fd *fd) { + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->fd_count; i++) { + if (pss->fds[i] == fd) { + UNREF_BY(exec_ctx, fd, 2, "pollset_set"); + break; + } + } + GPR_ASSERT(i != pss->fd_count); + for (; i < pss->fd_count - 1; i++) { + pss->fds[i] = pss->fds[i + 1]; + } + pss->fd_count--; + gpr_mu_unlock(&pss->mu); +} static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) {} + grpc_pollset_set *pss, grpc_pollset *ps) { + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_pollset"; + pollable *pollable_obj; + if (!GRPC_LOG_IF_ERROR( + err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) { + return; + } + pss = pss_lock_adam(pss); + for (size_t i = 0; i < pss->fd_count; i++) { + append_error(&error, pollable_add_fd(pollable_obj, pss->fds[i]), err_desc); + } + if (pss->pollset_count == pss->pollset_capacity) { + pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); + pss->pollsets = gpr_realloc(pss->pollsets, + pss->pollset_capacity * sizeof(*pss->pollsets)); + } + pss->pollsets[pss->pollset_count++] = pollable_obj; + gpr_mu_unlock(&pss->mu); + + GRPC_LOG_IF_ERROR(err_desc, error); +} static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) {} + grpc_pollset_set *pss, grpc_pollset *ps) { + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->pollset_count; i++) { + if (pss->pollsets[i] == ps->active_pollable) { + pollable_unref(pss->pollsets[i]); + break; + } + } + GPR_ASSERT(i != pss->pollset_count); + for (; i < pss->pollset_count - 1; i++) { + pss->pollsets[i] = pss->pollsets[i + 1]; + } + pss->pollset_count--; + gpr_mu_unlock(&pss->mu); +} + +static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, pollable **pollables, + size_t pollable_count, + const char *err_desc) { + grpc_error *error = GRPC_ERROR_NONE; + for (size_t i = 0; i < fd_count; i++) { + for (size_t j = 0; j < pollable_count; j++) { + append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc); + } + } + return error; +} static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} + grpc_pollset_set *a, + grpc_pollset_set *b) { + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; + for (;;) { + if (a == b) { + // pollset ancestors are the same: nothing to do + return; + } + if (a > b) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + gpr_mu_lock(&a->mu); + gpr_mu_lock(&b->mu); + if (a->parent != NULL) { + a = a->parent; + } else if (b->parent != NULL) { + b = b->parent; + } else { + break; // exit loop, both pollsets locked + } + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); + } + // try to do the least copying possible + // TODO(ctiller): there's probably a better heuristic here + const size_t a_size = a->fd_count + a->pollset_count; + const size_t b_size = b->fd_count + b->pollset_count; + if (b_size > a_size) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + gpr_ref(&a->refs); + b->parent = a; + append_error(&error, + add_fds_to_pollables(exec_ctx, a->fds, a->fd_count, b->pollsets, + b->pollset_count, "merge_a2b"), + err_desc); + append_error(&error, + add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, a->pollsets, + a->pollset_count, "merge_b2a"), + err_desc); + if (a->fd_capacity < a->fd_count + b->fd_count) { + a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); + a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + } + if (a->pollset_capacity < a->pollset_count + b->pollset_count) { + a->pollset_capacity = + GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); + a->pollsets = + gpr_realloc(a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); + } + memcpy(a->fds + a->fd_count, b->fds, b->fd_count * sizeof(*b->fds)); + memcpy(a->pollsets + a->pollset_count, b->pollsets, + b->pollset_count * sizeof(*b->pollsets)); + a->fd_count += b->fd_count; + a->pollset_count += b->pollset_count; + gpr_free(b->fds); + gpr_free(b->pollsets); + b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0; + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); +} static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, |