diff options
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 398 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.h | 6 |
2 files changed, 296 insertions, 108 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 365aa583bb..9472a8e520 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -239,22 +240,43 @@ struct grpc_pollset_set { * condition variable polling definitions */ +#define POLLCV_THREAD_GRACE_MS 1000 #define CV_POLL_PERIOD_MS 1000 #define CV_DEFAULT_TABLE_SIZE 16 -typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t; - -typedef struct poll_args { +typedef struct poll_result { gpr_refcount refcount; - gpr_cv *cv; + cv_node *watchers; + int watchcount; struct pollfd *fds; nfds_t nfds; - int timeout; int retval; int err; - gpr_atm status; + int completed; +} poll_result; + +typedef struct poll_args { + gpr_cv trigger; + int trigger_set; + struct pollfd *fds; + nfds_t nfds; + poll_result *result; + struct poll_args *next; + struct poll_args *prev; } poll_args; +// This is a 2-tiered cache, we mantain a hash table +// of active poll calls, so we can wait on the result +// of that call. We also maintain a freelist of inactive +// poll threads. +typedef struct poll_hash_table { + poll_args *free_pollers; + poll_args **active_pollers; + unsigned int size; + unsigned int count; +} poll_hash_table; + +poll_hash_table poll_cache; cv_fd_table g_cvfds; /******************************************************************************* @@ -1277,43 +1299,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, * Condition Variable polling extensions */ -static void decref_poll_args(poll_args *args) { - if (gpr_unref(&args->refcount)) { - gpr_free(args->fds); - gpr_cv_destroy(args->cv); - gpr_free(args->cv); - gpr_free(args); +static void run_poll(void *args); +static void cache_poller_locked(poll_args *args); + +static void cache_insert_locked(poll_args *args) { + uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), + 0xDEADBEEF); + key = key % poll_cache.size; + if (poll_cache.active_pollers[key]) { + poll_cache.active_pollers[key]->prev = args; } + args->next = poll_cache.active_pollers[key]; + args->prev = NULL; + poll_cache.active_pollers[key] = args; + poll_cache.count++; } -// Poll in a background thread -static void run_poll(void *arg) { - int timeout, retval; - poll_args *pargs = (poll_args *)arg; - while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - if (pargs->timeout < 0) { - timeout = CV_POLL_PERIOD_MS; - } else { - timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout); - pargs->timeout -= timeout; +static void init_result(poll_args *pargs) { + pargs->result = gpr_malloc(sizeof(poll_result)); + gpr_ref_init(&pargs->result->refcount, 1); + pargs->result->watchers = NULL; + pargs->result->watchcount = 0; + pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds); + memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds); + pargs->result->nfds = pargs->nfds; + pargs->result->retval = 0; + pargs->result->err = 0; + pargs->result->completed = 0; +} + +// Creates a poll_args object for a given arguments to poll(). +// This object may return a poll_args in the cache. +static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) { + uint32_t key = + gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + poll_args *curr = poll_cache.active_pollers[key]; + while (curr) { + if (curr->nfds == count && + memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) { + gpr_free(fds); + return curr; } - retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); - if (retval != 0 || pargs->timeout == 0) { - pargs->retval = retval; - pargs->err = errno; - break; + curr = curr->next; + } + + if (poll_cache.free_pollers) { + poll_args *pargs = poll_cache.free_pollers; + poll_cache.free_pollers = pargs->next; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = NULL; } + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + init_result(pargs); + cache_poller_locked(pargs); + return pargs; + } + + poll_args *pargs = gpr_malloc(sizeof(struct poll_args)); + gpr_cv_init(&pargs->trigger); + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + pargs->trigger_set = 0; + init_result(pargs); + cache_poller_locked(pargs); + gpr_thd_id t_id; + gpr_thd_options opt = gpr_thd_options_default(); + gpr_ref(&g_cvfds.pollcount); + gpr_thd_options_set_detached(&opt); + GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); + return pargs; +} + +static void cache_delete_locked(poll_args *args) { + if (!args->prev) { + uint32_t key = gpr_murmur_hash3( + args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + GPR_ASSERT(poll_cache.active_pollers[key] == args); + poll_cache.active_pollers[key] = args->next; + } else { + args->prev->next = args->next; } - gpr_mu_lock(&g_cvfds.mu); - if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - // Signal main thread that the poll completed - gpr_atm_no_barrier_store(&pargs->status, COMPLETED); - gpr_cv_signal(pargs->cv); + + if (args->next) { + args->next->prev = args->prev; } - decref_poll_args(pargs); - g_cvfds.pollcount--; - if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { - gpr_cv_signal(&g_cvfds.shutdown_complete); + + poll_cache.count--; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = args; + } + args->prev = NULL; + args->next = poll_cache.free_pollers; + gpr_free(args->fds); + poll_cache.free_pollers = args; +} + +static void cache_poller_locked(poll_args *args) { + if (poll_cache.count + 1 > poll_cache.size / 2) { + poll_args **old_active_pollers = poll_cache.active_pollers; + poll_cache.size = poll_cache.size * 2; + poll_cache.count = 0; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + for (unsigned int i = 0; i < poll_cache.size / 2; i++) { + poll_args *curr = old_active_pollers[i]; + poll_args *next = NULL; + while (curr) { + next = curr->next; + cache_insert_locked(curr); + curr = next; + } + } + gpr_free(old_active_pollers); + } + + cache_insert_locked(args); +} + +static void cache_destroy_locked(poll_args *args) { + if (args->next) { + args->next->prev = args->prev; + } + + if (args->prev) { + args->prev->next = args->next; + } else { + poll_cache.free_pollers = args->next; + } + + gpr_free(args); +} + +static void decref_poll_result(poll_result *res) { + if (gpr_unref(&res->refcount)) { + GPR_ASSERT(!res->watchers); + gpr_free(res->fds); + gpr_free(res); + } +} + +void remove_cvn(cv_node **head, cv_node *target) { + if (target->next) { + target->next->prev = target->prev; + } + + if (target->prev) { + target->prev->next = target->next; + } else { + *head = target->next; + } +} + +gpr_timespec thread_grace; + +// Poll in a background thread +static void run_poll(void *args) { + poll_args *pargs = (poll_args *)args; + while (1) { + poll_result *result = pargs->result; + int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); + gpr_mu_lock(&g_cvfds.mu); + if (retval != 0) { + result->completed = 1; + result->retval = retval; + result->err = errno; + cv_node *watcher = result->watchers; + while (watcher) { + gpr_cv_signal(watcher->cv); + watcher = watcher->next; + } + } + if (result->watchcount == 0 || result->completed) { + cache_delete_locked(pargs); + decref_poll_result(result); + // Leave this polling thread alive for a grace period to do another poll() + // op + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = gpr_time_add(deadline, thread_grace); + pargs->trigger_set = 0; + gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + if (!pargs->trigger_set) { + cache_destroy_locked(pargs); + break; + } + } + gpr_mu_unlock(&g_cvfds.mu); + } + + // We still have the lock here + if (gpr_unref(&g_cvfds.pollcount)) { + gpr_cv_signal(&g_cvfds.shutdown_cv); } gpr_mu_unlock(&g_cvfds.mu); } @@ -1322,24 +1506,29 @@ static void run_poll(void *arg) { static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - gpr_cv *pollcv; - cv_node *cvn, *prev; + cv_node *pollcv; int skip_poll = 0; nfds_t nsockfds = 0; - gpr_thd_id t_id; - gpr_thd_options opt; - poll_args *pargs = NULL; + poll_result *result = NULL; gpr_mu_lock(&g_cvfds.mu); - pollcv = gpr_malloc(sizeof(gpr_cv)); - gpr_cv_init(pollcv); + pollcv = gpr_malloc(sizeof(cv_node)); + pollcv->next = NULL; + gpr_cv pollcv_cv; + gpr_cv_init(&pollcv_cv); + pollcv->cv = &pollcv_cv; + cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node)); + for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { idx = FD_TO_IDX(fds[i].fd); - cvn = gpr_malloc(sizeof(cv_node)); - cvn->cv = pollcv; - cvn->next = g_cvfds.cvfds[idx].cvs; - g_cvfds.cvfds[idx].cvs = cvn; + fd_cvs[i].cv = &pollcv_cv; + fd_cvs[i].prev = NULL; + fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; + if (g_cvfds.cvfds[idx].cvs) { + g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]); + } + g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]); // Don't bother polling if a wakeup fd is ready if (g_cvfds.cvfds[idx].is_set) { skip_poll = 1; @@ -1349,81 +1538,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { } } + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + if (timeout < 0) { + deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + } else { + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + } + res = 0; if (!skip_poll && nsockfds > 0) { - pargs = gpr_malloc(sizeof(struct poll_args)); - // Both the main thread and calling thread get a reference - gpr_ref_init(&pargs->refcount, 2); - pargs->cv = pollcv; - pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - pargs->nfds = nsockfds; - pargs->timeout = timeout; - pargs->retval = 0; - pargs->err = 0; - gpr_atm_no_barrier_store(&pargs->status, INPROGRESS); + struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd >= 0) { - pargs->fds[idx].fd = fds[i].fd; - pargs->fds[idx].events = fds[i].events; - pargs->fds[idx].revents = 0; + pollfds[idx].fd = fds[i].fd; + pollfds[idx].events = fds[i].events; + pollfds[idx].revents = 0; idx++; } } - g_cvfds.pollcount++; - opt = gpr_thd_options_default(); - gpr_thd_options_set_detached(&opt); - GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt)); - // We want the poll() thread to trigger the deadline, so wait forever here - gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - res = pargs->retval; - errno = pargs->err; - } else { - errno = 0; - gpr_atm_no_barrier_store(&pargs->status, CANCELLED); + poll_args *pargs = get_poller_locked(pollfds, nsockfds); + result = pargs->result; + pollcv->next = result->watchers; + pollcv->prev = NULL; + if (result->watchers) { + result->watchers->prev = pollcv; } + result->watchers = pollcv; + result->watchcount++; + gpr_ref(&result->refcount); + + pargs->trigger_set = 1; + gpr_cv_signal(&pargs->trigger); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + res = result->retval; + errno = result->err; + result->watchcount--; + remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); - deadline = - gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); - gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); } idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; - prev = NULL; - while (cvn->cv != pollcv) { - prev = cvn; - cvn = cvn->next; - GPR_ASSERT(cvn); - } - if (!prev) { - g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; - } else { - prev->next = cvn->next; - } - gpr_free(cvn); - + remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } - } else if (!skip_poll && fds[i].fd >= 0 && - gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - fds[i].revents = pargs->fds[idx].revents; + } else if (!skip_poll && fds[i].fd >= 0 && result->completed) { + fds[i].revents = result->fds[idx].revents; idx++; } } - if (pargs) { - decref_poll_args(pargs); - } else { - gpr_cv_destroy(pollcv); - gpr_free(pollcv); + gpr_free(fd_cvs); + gpr_free(pollcv); + if (result) { + decref_poll_result(result); } + gpr_mu_unlock(&g_cvfds.mu); return res; @@ -1432,12 +1608,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { static void global_cv_fd_table_init() { gpr_mu_init(&g_cvfds.mu); gpr_mu_lock(&g_cvfds.mu); - gpr_cv_init(&g_cvfds.shutdown_complete); - g_cvfds.shutdown = 0; - g_cvfds.pollcount = 0; + gpr_cv_init(&g_cvfds.shutdown_cv); + gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = NULL; + thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = NULL; @@ -1447,23 +1623,35 @@ static void global_cv_fd_table_init() { // Override the poll function with one that supports cvfds g_cvfds.poll = grpc_poll_function; grpc_poll_function = &cvfd_poll; + + // Initialize the cache + poll_cache.size = 32; + poll_cache.count = 0; + poll_cache.free_pollers = NULL; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + gpr_mu_unlock(&g_cvfds.mu); } static void global_cv_fd_table_shutdown() { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.shutdown = 1; // Attempt to wait for all abandoned poll() threads to terminate // Not doing so will result in reported memory leaks - if (g_cvfds.pollcount > 0) { - int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, + if (!gpr_unref(&g_cvfds.pollcount)) { + int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(3, GPR_TIMESPAN))); GPR_ASSERT(res == 0); } - gpr_cv_destroy(&g_cvfds.shutdown_complete); + gpr_cv_destroy(&g_cvfds.shutdown_cv); grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + + gpr_free(poll_cache.active_pollers); + gpr_mu_unlock(&g_cvfds.mu); gpr_mu_destroy(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index c5dcdc9746..46e84f5843 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -43,6 +43,7 @@ typedef struct cv_node { gpr_cv* cv; struct cv_node* next; + struct cv_node* prev; } cv_node; typedef struct fd_node { @@ -53,9 +54,8 @@ typedef struct fd_node { typedef struct cv_fd_table { gpr_mu mu; - int pollcount; - int shutdown; - gpr_cv shutdown_complete; + gpr_refcount pollcount; + gpr_cv shutdown_cv; fd_node* cvfds; fd_node* free_fds; unsigned int size; |