aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-05-14 16:33:16 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-05-31 09:29:53 -0700
commit97c2d6a269472a8a6e56d9e3d88f89bd27aff5ac (patch)
tree5323287dd0138f5e1fb5a4257d8196aa5cb30911
parent9ff57f67a0920e8e39baedfec5671fb6e662d257 (diff)
Remove grpc_fd_watcher and related code from ev_epoll_posix.c
-rw-r--r--src/core/lib/iomgr/ev_epoll_posix.c271
1 files changed, 38 insertions, 233 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_posix.c b/src/core/lib/iomgr/ev_epoll_posix.c
index 5776b6c1cf..920be1951f 100644
--- a/src/core/lib/iomgr/ev_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_epoll_posix.c
@@ -59,17 +59,6 @@
* FD declarations
*/
-/* TODO(sreek) : Check if grpc_fd_watcher is needed (and if so, check if we can
- * share this between ev_poll_posix.h and ev_epoll_posix versions */
-
-typedef struct grpc_fd_watcher {
- struct grpc_fd_watcher *next;
- struct grpc_fd_watcher *prev;
- grpc_pollset *pollset;
- grpc_pollset_worker *worker;
- grpc_fd *fd;
-} grpc_fd_watcher;
-
struct grpc_fd {
int fd;
/* refst format:
@@ -84,32 +73,6 @@ struct grpc_fd {
int closed;
int released;
- /* The watcher list.
-
- The following watcher related fields are protected by watcher_mu.
-
- An fd_watcher is an ephemeral object created when an fd wants to
- begin polling, and destroyed after the poll.
-
- It denotes the fd's interest in whether to read poll or write poll
- or both or neither on this fd.
-
- If a watcher is asked to poll for reads or writes, the read_watcher
- or write_watcher fields are set respectively. A watcher may be asked
- to poll for both, in which case both fields will be set.
-
- read_watcher and write_watcher may be NULL if no watcher has been
- asked to poll for reads or writes.
-
- If an fd_watcher is not asked to poll for reads or writes, it's added
- to a linked list of inactive watchers, rooted at inactive_watcher_root.
- If at a later time there becomes need of a poller to poll, one of
- the inactive pollers may be kicked out of their poll loops to take
- that responsibility. */
- grpc_fd_watcher inactive_watcher_root;
- grpc_fd_watcher *read_watcher;
- grpc_fd_watcher *write_watcher;
-
grpc_closure *read_closure;
grpc_closure *write_closure;
@@ -120,27 +83,6 @@ struct grpc_fd {
grpc_iomgr_object iomgr_object;
};
-/* Begin polling on an fd.
- Registers that the given pollset is interested in this fd - so that if read
- or writability interest changes, the pollset can be kicked to pick up that
- new interest.
- Return value is:
- (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
- i.e. a combination of read_mask and write_mask determined by the fd's current
- interest in said events.
- Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function.
- MUST NOT be called with a pollset lock taken */
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- grpc_pollset_worker *worker, uint32_t read_mask,
- uint32_t write_mask, grpc_fd_watcher *rec);
-/* Complete polling previously started with fd_begin_poll
- MUST NOT be called with a pollset lock taken
- if got_read or got_write are 1, also does the become_{readable,writable} as
- appropriate. */
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write);
-
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@@ -307,10 +249,7 @@ static grpc_fd *alloc_fd(int fd) {
r->read_closure = CLOSURE_NOT_READY;
r->write_closure = CLOSURE_NOT_READY;
r->fd = fd;
- r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
- &r->inactive_watcher_root;
r->freelist_next = NULL;
- r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
@@ -387,43 +326,6 @@ static bool fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void pollset_kick_locked(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->pollset->mu);
- GPR_ASSERT(watcher->worker);
- pollset_kick_ext(watcher->pollset, watcher->worker,
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
- gpr_mu_unlock(&watcher->pollset->mu);
-}
-
-static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
- if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next);
- } else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
- } else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher);
- }
-}
-
-static void wake_all_watchers_locked(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
- for (watcher = fd->inactive_watcher_root.next;
- watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher);
- }
- if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
- }
- if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher);
- }
-}
-
-static int has_watchers(grpc_fd *fd) {
- return fd->read_watcher != NULL || fd->write_watcher != NULL ||
- fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
-}
-
static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
fd->closed = 1;
if (!fd->released) {
@@ -454,11 +356,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
- if (!has_watchers(fd)) {
- close_fd_locked(exec_ctx, fd);
- } else {
- wake_all_watchers_locked(fd);
- }
+ close_fd_locked(exec_ctx, fd);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@@ -489,7 +387,6 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
- maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -540,111 +437,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- grpc_pollset_worker *worker, uint32_t read_mask,
- uint32_t write_mask, grpc_fd_watcher *watcher) {
- uint32_t mask = 0;
- grpc_closure *cur;
- int requested;
- /* keep track of pollers that have requested our events, in case they change
- */
- GRPC_FD_REF(fd, "poll");
-
- gpr_mu_lock(&fd->mu);
-
- /* if we are shutdown, then don't add to the watcher set */
- if (fd->shutdown) {
- watcher->fd = NULL;
- watcher->pollset = NULL;
- watcher->worker = NULL;
- gpr_mu_unlock(&fd->mu);
- GRPC_FD_UNREF(fd, "poll");
- return 0;
- }
-
- /* if there is nobody polling for read, but we need to, then start doing so */
- cur = fd->read_closure;
- requested = cur != CLOSURE_READY;
- if (read_mask && fd->read_watcher == NULL && requested) {
- fd->read_watcher = watcher;
- mask |= read_mask;
- }
- /* if there is nobody polling for write, but we need to, then start doing so
- */
- cur = fd->write_closure;
- requested = cur != CLOSURE_READY;
- if (write_mask && fd->write_watcher == NULL && requested) {
- fd->write_watcher = watcher;
- mask |= write_mask;
- }
- /* if not polling, remember this watcher in case we need someone to later */
- if (mask == 0 && worker != NULL) {
- watcher->next = &fd->inactive_watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
- }
- watcher->pollset = pollset;
- watcher->worker = worker;
- watcher->fd = fd;
- gpr_mu_unlock(&fd->mu);
-
- return mask;
-}
-
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write) {
- int was_polling = 0;
- int kick = 0;
- grpc_fd *fd = watcher->fd;
-
- if (fd == NULL) {
- return;
- }
-
- gpr_mu_lock(&fd->mu);
-
- if (watcher == fd->read_watcher) {
- /* remove read watcher, kick if we still need a read */
- was_polling = 1;
- if (!got_read) {
- kick = 1;
- }
- fd->read_watcher = NULL;
- }
- if (watcher == fd->write_watcher) {
- /* remove write watcher, kick if we still need a write */
- was_polling = 1;
- if (!got_write) {
- kick = 1;
- }
- fd->write_watcher = NULL;
- }
- if (!was_polling && watcher->worker != NULL) {
- /* remove from inactive list */
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- }
- if (got_read) {
- if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
- kick = 1;
- }
- }
- if (got_write) {
- if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
- kick = 1;
- }
- }
- if (kick) {
- maybe_wake_one_watcher_locked(fd);
- }
- if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
- close_fd_locked(exec_ctx, fd);
- }
- gpr_mu_unlock(&fd->mu);
-
- GRPC_FD_UNREF(fd, "poll");
-}
-
/*******************************************************************************
* pollset_posix.c
*/
@@ -1085,32 +877,45 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
epoll_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
- grpc_fd_watcher watcher;
-
- /* We pretend to be polling whilst adding an fd to keep the fd from being
- closed during the add. This may result in a spurious wakeup being assigned
- to this pollset whilst adding, but that should be benign. */
- /* TODO (sreek). This fd_begin_poll() really seem to accomplish adding
- * GRPC_FD_REF() (i.e adding a refcount to the fd) and checking that the
- * fd is not shutting down (in which case watcher.fd will be NULL and no
- * refcount is added). The ref count is added only durng hte duration of
- * adding it to the epoll set (after which fd_end_poll would be called and
- * the fd's ref count is decremented by 1. So do we still need fd_begin_poll
- * ??? */
- GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
- if (watcher.fd != NULL) {
- ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fd;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0) {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
- strerror(errno));
- }
+
+ /* Hold a ref to the fd to keep it from being closed during the add. This may
+ result in a spurious wakeup being assigned to this pollset whilst adding,
+ but that should be benign. */
+ /* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
+ * pollset..and how holding a reference will prevent the fd from being closed
+ * (and perhaps more importantly, see how can an fd be closed while being
+ * added to the epollset */
+ GRPC_FD_REF(fd, "add fd");
+
+ gpr_mu_lock(&fd->mu);
+ if (fd->shutdown) {
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "add fd");
+ return;
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
}
}
- fd_end_poll(exec_ctx, &watcher, 0, 0);
+
+ /* The fd might have been orphaned while we were adding it to the epoll set.
+ Close the fd in such a case (which will also take care of removing it from
+ the epoll set */
+ gpr_mu_lock(&fd->mu);
+ if (fd_is_orphaned(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "add fd");
}
/* Creates an epoll fd and initializes the pollset */