aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/fd_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r--src/core/iomgr/fd_posix.c220
1 files changed, 106 insertions, 114 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b48b7f050a..231bc988a8 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -45,10 +45,8 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
-enum descriptor_state {
- NOT_READY = 0,
- READY = 1
-}; /* or a pointer to a closure to call */
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
/* We need to keep a freelist not because of any concerns of malloc performance
* but instead so that implementations with multiple threads in (for example)
@@ -88,14 +86,13 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_unlock(&fd_freelist_mu);
if (r == NULL) {
r = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&r->set_state_mu);
- gpr_mu_init(&r->watcher_mu);
+ gpr_mu_init(&r->mu);
}
gpr_atm_rel_store(&r->refst, 1);
- gpr_atm_rel_store(&r->readst, NOT_READY);
- gpr_atm_rel_store(&r->writest, NOT_READY);
- gpr_atm_rel_store(&r->shutdown, 0);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
r->fd = fd;
r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
&r->inactive_watcher_root;
@@ -107,8 +104,7 @@ static grpc_fd *alloc_fd(int fd) {
}
static void destroy(grpc_fd *fd) {
- gpr_mu_destroy(&fd->set_state_mu);
- gpr_mu_destroy(&fd->watcher_mu);
+ gpr_mu_destroy(&fd->mu);
gpr_free(fd);
}
@@ -173,39 +169,35 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void pollset_kick_locked(grpc_pollset *pollset) {
- gpr_mu_lock(GRPC_POLLSET_MU(pollset));
- grpc_pollset_kick(pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+ GPR_ASSERT(watcher->worker);
+ grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
+ pollset_kick_locked(fd->inactive_watcher_root.next);
} else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
} else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
-static void maybe_wake_one_watcher(grpc_fd *fd) {
- gpr_mu_lock(&fd->watcher_mu);
- maybe_wake_one_watcher_locked(fd);
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher->pollset);
+ pollset_kick_locked(watcher);
}
if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher->pollset);
+ pollset_kick_locked(fd->read_watcher);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher->pollset);
+ pollset_kick_locked(fd->write_watcher);
}
}
@@ -218,7 +210,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
fd->closed = 1;
@@ -227,7 +219,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
} else {
wake_all_watchers_locked(fd);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@@ -247,136 +239,121 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st,
- grpc_closure *closure) {
- switch (gpr_atm_acq_load(st)) {
- case NOT_READY:
- /* There is no race if the descriptor is already ready, so we skip
- the interlocked op in that case. As long as the app doesn't
- try to set the same upcall twice (which it shouldn't) then
- oldval should never be anything other than READY or NOT_READY. We
- don't
- check for user error on the fast path. */
- if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
- /* swap was successful -- the closure will run after the next
- set_ready call. NOTE: we don't have an ABA problem here,
- since we should never have concurrent calls to the same
- notify_on function. */
- maybe_wake_one_watcher(fd);
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the READY code below */
- case READY:
- GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
- gpr_atm_rel_store(st, NOT_READY);
- grpc_exec_ctx_enqueue(exec_ctx, closure,
- !gpr_atm_acq_load(&fd->shutdown));
- return;
- default: /* WAITING */
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
}
- gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
- abort();
}
-static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- gpr_atm *st) {
- gpr_intptr state = gpr_atm_acq_load(st);
-
- switch (state) {
- case READY:
- /* duplicate ready, ignore */
- return;
- case NOT_READY:
- if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
- /* swap was successful -- the closure will run after the next
- notify_on call. */
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the WAITING code below */
- state = gpr_atm_acq_load(st);
- default: /* waiting */
- GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
- gpr_atm_no_barrier_load(st) != NOT_READY);
- grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state,
- !gpr_atm_acq_load(&fd->shutdown));
- gpr_atm_rel_store(st, NOT_READY);
- return;
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
+ *st = CLOSURE_NOT_READY;
+ return 1;
}
}
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
- gpr_mu_lock(&fd->set_state_mu);
+ gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
- gpr_mu_unlock(&fd->set_state_mu);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- gpr_mu_lock(&fd->set_state_mu);
+ gpr_mu_lock(&fd->mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
- gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(exec_ctx, fd, &fd->readst);
- set_ready_locked(exec_ctx, fd, &fd->writest);
- gpr_mu_unlock(&fd->set_state_mu);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- notify_on(exec_ctx, fd, &fd->readst, closure);
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
}
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- notify_on(exec_ctx, fd, &fd->writest, closure);
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask,
- grpc_fd_watcher *watcher) {
+ grpc_pollset_worker *worker, gpr_uint32 read_mask,
+ gpr_uint32 write_mask, grpc_fd_watcher *watcher) {
gpr_uint32 mask = 0;
+ grpc_closure *cur;
+ int requested;
/* keep track of pollers that have requested our events, in case they change
*/
GRPC_FD_REF(fd, "poll");
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
+
/* if we are shutdown, then don't add to the watcher set */
if (gpr_atm_no_barrier_load(&fd->shutdown)) {
watcher->fd = NULL;
watcher->pollset = NULL;
- gpr_mu_unlock(&fd->watcher_mu);
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
return 0;
}
+
/* if there is nobody polling for read, but we need to, then start doing so */
- if (read_mask && !fd->read_watcher &&
- (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (write_mask && !fd->write_watcher &&
- (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
fd->write_watcher = watcher;
mask |= write_mask;
}
/* if not polling, remember this watcher in case we need someone to later */
- if (mask == 0) {
+ if (mask == 0 && worker != NULL) {
watcher->next = &fd->inactive_watcher_root;
watcher->prev = watcher->next->prev;
watcher->next->prev = watcher->prev->next = watcher;
}
watcher->pollset = pollset;
+ watcher->worker = worker;
watcher->fd = fd;
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
return mask;
}
@@ -391,24 +368,39 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
return;
}
- gpr_mu_lock(&fd->watcher_mu);
+ gpr_mu_lock(&fd->mu);
+
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
was_polling = 1;
- kick = kick || !got_read;
+ if (!got_read) {
+ kick = 1;
+ }
fd->read_watcher = NULL;
}
if (watcher == fd->write_watcher) {
/* remove write watcher, kick if we still need a write */
was_polling = 1;
- kick = kick || !got_write;
+ if (!got_write) {
+ kick = 1;
+ }
fd->write_watcher = NULL;
}
- if (!was_polling) {
+ if (!was_polling && watcher->worker != NULL) {
/* remove from inactive list */
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
}
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
@@ -417,17 +409,17 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
close(fd->fd);
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
}
void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->readst);
+ set_ready(exec_ctx, fd, &fd->read_closure);
}
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->writest);
+ set_ready(exec_ctx, fd, &fd->write_closure);
}
#endif