aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 08:00:02 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 08:00:02 -0800
commit7d41321306eecdd7fa1dbbcf663cc94fd75db975 (patch)
treebb4fd766b4c96fff15b069619b978b9e7e8a4d76
parent970781bba9982e66c70e012fce1ba08fb8b88da8 (diff)
Reduce contention on lock
Change the fd watcher from being O(active_pollers) to O(1), reducing time spent under the fd->watcher_mu lock, and ultimately scaling us much better.
-rw-r--r--src/core/iomgr/fd_posix.c50
-rw-r--r--src/core/iomgr/fd_posix.h22
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c26
-rw-r--r--src/core/iomgr/pollset_posix.c9
4 files changed, 52 insertions, 55 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b67c6cde70..737ee016aa 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -68,7 +68,6 @@ static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
- gpr_free(fd->watchers);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -93,9 +92,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
- r->watchers = NULL;
- r->watcher_count = 0;
- r->watcher_capacity = 0;
+ r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
r->freelist_next = NULL;
return r;
}
@@ -118,9 +115,7 @@ static void unref_by(grpc_fd *fd, int n) {
}
}
-void grpc_fd_global_init(void) {
- gpr_mu_init(&fd_freelist_mu);
-}
+void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
void grpc_fd_global_shutdown(void) {
while (fd_freelist != NULL) {
@@ -145,11 +140,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
}
static void wake_watchers(grpc_fd *fd) {
- size_t i, n;
+ grpc_fd_watcher *watcher;
gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (i = 0; i < n; i++) {
- grpc_pollset_force_kick(fd->watchers[i]);
+ for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
+ watcher = watcher->next) {
+ grpc_pollset_force_kick(watcher->pollset);
}
gpr_mu_unlock(&fd->watcher_mu);
}
@@ -293,36 +288,27 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask) {
+ gpr_uint32 read_mask, gpr_uint32 write_mask,
+ grpc_fd_watcher *watcher) {
/* keep track of pollers that have requested our events, in case they change
*/
gpr_mu_lock(&fd->watcher_mu);
- if (fd->watcher_capacity == fd->watcher_count) {
- fd->watcher_capacity =
- GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
- fd->watchers = gpr_realloc(fd->watchers,
- fd->watcher_capacity * sizeof(grpc_pollset *));
- }
- fd->watchers[fd->watcher_count++] = pollset;
+ watcher->next = &fd->watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ watcher->pollset = pollset;
+ watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
}
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
- size_t r, w, n;
-
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (r = 0, w = 0; r < n; r++) {
- if (fd->watchers[r] == pollset) {
- fd->watcher_count--;
- continue;
- }
- fd->watchers[w++] = fd->watchers[r];
- }
- gpr_mu_unlock(&fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->fd->watcher_mu);
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ gpr_mu_unlock(&watcher->fd->watcher_mu);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index f42ae19579..9a675087e5 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -47,7 +47,16 @@ typedef struct {
gpr_atm state;
} grpc_fd_state;
-typedef struct grpc_fd {
+typedef struct grpc_fd grpc_fd;
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
int fd;
/* refst format:
bit0: 1=active/0=orphaned
@@ -60,9 +69,7 @@ typedef struct grpc_fd {
gpr_atm shutdown;
gpr_mu watcher_mu;
- grpc_pollset **watchers;
- size_t watcher_count;
- size_t watcher_capacity;
+ grpc_fd_watcher watcher_root;
grpc_fd_state readst;
grpc_fd_state writest;
@@ -70,7 +77,7 @@ typedef struct grpc_fd {
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;
-} grpc_fd;
+};
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
@@ -95,9 +102,10 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data);
Polling strategies that do not need to alter their behavior depending on the
fd's current interest (such as epoll) do not need to call this function. */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask);
+ gpr_uint32 read_mask, gpr_uint32 write_mask,
+ grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll */
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset);
+void grpc_fd_end_poll(grpc_fd_watcher *rec);
/* Return 1 if this fd is orphaned, 0 otherwise */
int grpc_fd_is_orphaned(grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index e88296979d..3244ae08db 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -53,11 +53,11 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd and the
- * grpc_fd* that the pollfd was constructed from */
+ /* fds being polled by the current poller: parallel arrays of pollfd, and
+ a grpc_fd_watcher */
size_t pfd_count;
size_t pfd_capacity;
- grpc_fd **selfds;
+ grpc_fd_watcher *watchers;
struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
@@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) {
pollset_hdr *h;
h = pollset->data.ptr;
for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(h->selfds[i], pollset);
+ grpc_fd_end_poll(&h->watchers[i]);
}
}
@@ -125,9 +125,9 @@ static int multipoll_with_poll_pollset_maybe_work(
if (h->pfd_capacity < h->fd_count + 1) {
h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
gpr_free(h->pfds);
- gpr_free(h->selfds);
+ gpr_free(h->watchers);
h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity);
+ h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
}
nf = 0;
np = 1;
@@ -147,7 +147,7 @@ static int multipoll_with_poll_pollset_maybe_work(
grpc_fd_unref(h->fds[i]);
} else {
h->fds[nf++] = h->fds[i];
- h->selfds[np] = h->fds[i];
+ h->watchers[np].fd = h->fds[i];
h->pfds[np].fd = h->fds[i]->fd;
h->pfds[np].revents = 0;
np++;
@@ -167,8 +167,8 @@ static int multipoll_with_poll_pollset_maybe_work(
gpr_mu_unlock(&pollset->mu);
for (i = 1; i < np; i++) {
- h->pfds[i].events =
- grpc_fd_begin_poll(h->selfds[i], pollset, POLLIN, POLLOUT);
+ h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
+ POLLOUT, &h->watchers[i]);
}
r = poll(h->pfds, h->pfd_count, timeout);
@@ -184,10 +184,10 @@ static int multipoll_with_poll_pollset_maybe_work(
}
for (i = 1; i < np; i++) {
if (h->pfds[i].revents & POLLIN) {
- grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback);
+ grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
}
if (h->pfds[i].revents & POLLOUT) {
- grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback);
+ grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
}
}
}
@@ -211,7 +211,7 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
grpc_fd_unref(h->dels[i]);
}
gpr_free(h->pfds);
- gpr_free(h->selfds);
+ gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@@ -234,7 +234,7 @@ void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->pfd_count = 0;
h->pfd_capacity = 0;
h->pfds = NULL;
- h->selfds = NULL;
+ h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index b1c2c64a18..b0404b870b 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -80,7 +80,9 @@ void grpc_pollset_kick(grpc_pollset *p) {
}
}
-void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); }
+void grpc_pollset_force_kick(grpc_pollset *p) {
+ grpc_pollset_kick_kick(&p->kick_state);
+}
/* global state management */
@@ -217,6 +219,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
+ grpc_fd_watcher fd_watcher;
int timeout;
int r;
@@ -249,7 +252,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pollset->counter = 1;
gpr_mu_unlock(&pollset->mu);
- pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT);
+ pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
if (r < 0) {
@@ -271,7 +274,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
}
grpc_pollset_kick_post_poll(&pollset->kick_state);
- grpc_fd_end_poll(fd, pollset);
+ grpc_fd_end_poll(&fd_watcher);
gpr_mu_lock(&pollset->mu);
pollset->counter = 0;