diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 08:00:02 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 08:00:02 -0800 |
commit | 7d41321306eecdd7fa1dbbcf663cc94fd75db975 (patch) | |
tree | bb4fd766b4c96fff15b069619b978b9e7e8a4d76 | |
parent | 970781bba9982e66c70e012fce1ba08fb8b88da8 (diff) |
Reduce contention on lock
Change the fd watcher from being O(active_pollers) to O(1), reducing time spent under the fd->watcher_mu lock, and ultimately scaling us much better.
-rw-r--r-- | src/core/iomgr/fd_posix.c | 50 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 22 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 26 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 9 |
4 files changed, 52 insertions, 55 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b67c6cde70..737ee016aa 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -68,7 +68,6 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { - gpr_free(fd->watchers); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -93,9 +92,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_atm_rel_store(&r->writest.state, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; - r->watchers = NULL; - r->watcher_count = 0; - r->watcher_capacity = 0; + r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; r->freelist_next = NULL; return r; } @@ -118,9 +115,7 @@ static void unref_by(grpc_fd *fd, int n) { } } -void grpc_fd_global_init(void) { - gpr_mu_init(&fd_freelist_mu); -} +void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } void grpc_fd_global_shutdown(void) { while (fd_freelist != NULL) { @@ -145,11 +140,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void wake_watchers(grpc_fd *fd) { - size_t i, n; + grpc_fd_watcher *watcher; gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (i = 0; i < n; i++) { - grpc_pollset_force_kick(fd->watchers[i]); + for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root; + watcher = watcher->next) { + grpc_pollset_force_kick(watcher->pollset); } gpr_mu_unlock(&fd->watcher_mu); } @@ -293,36 +288,27 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask) { + gpr_uint32 read_mask, gpr_uint32 write_mask, + grpc_fd_watcher *watcher) { /* keep track of pollers that have requested our events, in case they change */ gpr_mu_lock(&fd->watcher_mu); - if (fd->watcher_capacity == fd->watcher_count) { - fd->watcher_capacity = - GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); - fd->watchers = gpr_realloc(fd->watchers, - fd->watcher_capacity * sizeof(grpc_pollset *)); - } - fd->watchers[fd->watcher_count++] = pollset; + watcher->next = &fd->watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + watcher->pollset = pollset; + watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); } -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { - size_t r, w, n; - - gpr_mu_lock(&fd->watcher_mu); - n = fd->watcher_count; - for (r = 0, w = 0; r < n; r++) { - if (fd->watchers[r] == pollset) { - fd->watcher_count--; - continue; - } - fd->watchers[w++] = fd->watchers[r]; - } - gpr_mu_unlock(&fd->watcher_mu); +void grpc_fd_end_poll(grpc_fd_watcher *watcher) { + gpr_mu_lock(&watcher->fd->watcher_mu); + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + gpr_mu_unlock(&watcher->fd->watcher_mu); } void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index f42ae19579..9a675087e5 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -47,7 +47,16 @@ typedef struct { gpr_atm state; } grpc_fd_state; -typedef struct grpc_fd { +typedef struct grpc_fd grpc_fd; + +typedef struct grpc_fd_watcher { + struct grpc_fd_watcher *next; + struct grpc_fd_watcher *prev; + grpc_pollset *pollset; + grpc_fd *fd; +} grpc_fd_watcher; + +struct grpc_fd { int fd; /* refst format: bit0: 1=active/0=orphaned @@ -60,9 +69,7 @@ typedef struct grpc_fd { gpr_atm shutdown; gpr_mu watcher_mu; - grpc_pollset **watchers; - size_t watcher_count; - size_t watcher_capacity; + grpc_fd_watcher watcher_root; grpc_fd_state readst; grpc_fd_state writest; @@ -70,7 +77,7 @@ typedef struct grpc_fd { grpc_iomgr_cb_func on_done; void *on_done_user_data; struct grpc_fd *freelist_next; -} grpc_fd; +}; /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. @@ -95,9 +102,10 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); Polling strategies that do not need to alter their behavior depending on the fd's current interest (such as epoll) do not need to call this function. */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask); + gpr_uint32 read_mask, gpr_uint32 write_mask, + grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll */ -void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); +void grpc_fd_end_poll(grpc_fd_watcher *rec); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index e88296979d..3244ae08db 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -53,11 +53,11 @@ typedef struct { size_t fd_count; size_t fd_capacity; grpc_fd **fds; - /* fds being polled by the current poller: parallel arrays of pollfd and the - * grpc_fd* that the pollfd was constructed from */ + /* fds being polled by the current poller: parallel arrays of pollfd, and + a grpc_fd_watcher */ size_t pfd_count; size_t pfd_capacity; - grpc_fd **selfds; + grpc_fd_watcher *watchers; struct pollfd *pfds; /* fds that have been removed from the pollset explicitly */ size_t del_count; @@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) { pollset_hdr *h; h = pollset->data.ptr; for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(h->selfds[i], pollset); + grpc_fd_end_poll(&h->watchers[i]); } } @@ -125,9 +125,9 @@ static int multipoll_with_poll_pollset_maybe_work( if (h->pfd_capacity < h->fd_count + 1) { h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); gpr_free(h->pfds); - gpr_free(h->selfds); + gpr_free(h->watchers); h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); - h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); + h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity); } nf = 0; np = 1; @@ -147,7 +147,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_fd_unref(h->fds[i]); } else { h->fds[nf++] = h->fds[i]; - h->selfds[np] = h->fds[i]; + h->watchers[np].fd = h->fds[i]; h->pfds[np].fd = h->fds[i]->fd; h->pfds[np].revents = 0; np++; @@ -167,8 +167,8 @@ static int multipoll_with_poll_pollset_maybe_work( gpr_mu_unlock(&pollset->mu); for (i = 1; i < np; i++) { - h->pfds[i].events = - grpc_fd_begin_poll(h->selfds[i], pollset, POLLIN, POLLOUT); + h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN, + POLLOUT, &h->watchers[i]); } r = poll(h->pfds, h->pfd_count, timeout); @@ -184,10 +184,10 @@ static int multipoll_with_poll_pollset_maybe_work( } for (i = 1; i < np; i++) { if (h->pfds[i].revents & POLLIN) { - grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); + grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback); } if (h->pfds[i].revents & POLLOUT) { - grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); + grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback); } } } @@ -211,7 +211,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { grpc_fd_unref(h->dels[i]); } gpr_free(h->pfds); - gpr_free(h->selfds); + gpr_free(h->watchers); gpr_free(h->fds); gpr_free(h->dels); gpr_free(h); @@ -234,7 +234,7 @@ void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, h->pfd_count = 0; h->pfd_capacity = 0; h->pfds = NULL; - h->selfds = NULL; + h->watchers = NULL; h->del_count = 0; h->del_capacity = 0; h->dels = NULL; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b1c2c64a18..b0404b870b 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -80,7 +80,9 @@ void grpc_pollset_kick(grpc_pollset *p) { } } -void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); } +void grpc_pollset_force_kick(grpc_pollset *p) { + grpc_pollset_kick_kick(&p->kick_state); +} /* global state management */ @@ -217,6 +219,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, int allow_synchronous_callback) { struct pollfd pfd[2]; grpc_fd *fd; + grpc_fd_watcher fd_watcher; int timeout; int r; @@ -249,7 +252,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pollset->counter = 1; gpr_mu_unlock(&pollset->mu); - pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); if (r < 0) { @@ -271,7 +274,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, } grpc_pollset_kick_post_poll(&pollset->kick_state); - grpc_fd_end_poll(fd, pollset); + grpc_fd_end_poll(&fd_watcher); gpr_mu_lock(&pollset->mu); pollset->counter = 0; |