From 988e37f1fc8542c205db569be0dd20f758c39164 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Oct 2015 07:53:56 -0700 Subject: Allow fd_posix to force a re-evaluation of polling on wakeup --- src/core/iomgr/fd_posix.c | 20 ++++++++++---------- src/core/iomgr/fd_posix.h | 2 ++ src/core/iomgr/pollset_posix.c | 30 +++++++++++++++++++++++++++++- src/core/iomgr/pollset_posix.h | 5 +++++ 4 files changed, 46 insertions(+), 11 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b48b7f050a..806af9bc26 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -173,19 +173,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_pollset *pollset) { - gpr_mu_lock(GRPC_POLLSET_MU(pollset)); - grpc_pollset_kick(pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(pollset)); +static void pollset_kick_locked(grpc_fd_watcher *watcher) { + gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(fd->inactive_watcher_root.next->pollset); + pollset_kick_locked(fd->inactive_watcher_root.next); } else if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher); } else if (fd->write_watcher) { - pollset_kick_locked(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher); } } @@ -199,13 +199,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(watcher->pollset); + pollset_kick_locked(watcher); } if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher); } } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 089aa4d717..a60aff2a09 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -46,6 +46,7 @@ typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; grpc_pollset *pollset; + grpc_pollset_worker *worker; grpc_fd *fd; } grpc_fd_watcher; @@ -126,6 +127,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, fd's current interest (such as epoll) do not need to call this function. MUST NOT be called with a pollset lock taken */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + grpc_pollset_worker *worker, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 82a82cc064..c88fff6b43 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -98,31 +98,59 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) { /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); } p->kicked_without_pollers = 1; + return; } else if (gpr_tls_get(&g_current_thread_worker) != (gpr_intptr)specific_worker) { + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; + } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; } } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { + if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + push_back_worker(p, specific_worker); + specific_worker = pop_front_worker(p); + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + push_back_worker(p, specific_worker); + return; + } + } push_back_worker(p, specific_worker); grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; } else { p->kicked_without_pollers = 1; + return; } } } +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + grpc_pollset_kick_ex(p, specific_worker, 0); +} + /* global state management */ void grpc_pollset_global_init(void) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 83c5258539..762582c79d 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -50,6 +50,7 @@ struct grpc_fd; typedef struct grpc_pollset_worker { grpc_wakeup_fd wakeup_fd; + int reevaluate_polling_on_wakeup; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; } grpc_pollset_worker; @@ -111,6 +112,10 @@ void grpc_kick_drain(grpc_pollset *p); int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); +#define GRPC_POLLSET_CAN_KICK_SELF 1 +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags); + /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, -- cgit v1.2.3 From dc17471545f0621ca03c8e73584139bcfde34479 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Oct 2015 10:25:02 -0700 Subject: Completing wakeup story --- src/core/iomgr/fd_posix.c | 10 ++-- src/core/iomgr/pollset_multipoller_with_epoll.c | 2 +- .../iomgr/pollset_multipoller_with_poll_posix.c | 4 +- src/core/iomgr/pollset_posix.c | 56 ++++++++++++++-------- 4 files changed, 44 insertions(+), 28 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 806af9bc26..222e745c64 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -339,8 +339,8 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - gpr_uint32 read_mask, gpr_uint32 write_mask, - grpc_fd_watcher *watcher) { + grpc_pollset_worker *worker, gpr_uint32 read_mask, + gpr_uint32 write_mask, grpc_fd_watcher *watcher) { gpr_uint32 mask = 0; /* keep track of pollers that have requested our events, in case they change */ @@ -351,6 +351,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, if (gpr_atm_no_barrier_load(&fd->shutdown)) { watcher->fd = NULL; watcher->pollset = NULL; + watcher->worker = NULL; gpr_mu_unlock(&fd->watcher_mu); GRPC_FD_UNREF(fd, "poll"); return 0; @@ -369,12 +370,13 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, mask |= write_mask; } /* if not polling, remember this watcher in case we need someone to later */ - if (mask == 0) { + if (mask == 0 && worker != NULL) { watcher->next = &fd->inactive_watcher_root; watcher->prev = watcher->next->prev; watcher->next->prev = watcher->prev->next = watcher; } watcher->pollset = pollset; + watcher->worker = worker; watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); @@ -404,7 +406,7 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, kick = kick || !got_write; fd->write_watcher = NULL; } - if (!was_polling) { + if (!was_polling && watcher->worker != NULL) { /* remove from inactive list */ watcher->next->prev = watcher->prev; watcher->prev->next = watcher->next; diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index d26e60f665..1030c3fd1b 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -72,7 +72,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ - GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); + GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); if (watcher.fd != NULL) { ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); ev.data.ptr = fd; diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 1356ebe7a0..79ff26dc04 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -147,8 +147,8 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_mu_unlock(&pollset->mu); for (i = 2; i < pfd_count; i++) { - pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, - POLLOUT, &watchers[i]); + pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, + POLLIN, POLLOUT, &watchers[i]); } /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c88fff6b43..b26f03d0c7 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -225,8 +225,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset->mu already held */ int added_worker = 0; int locked = 1; + int queued_work = 0; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; + worker->reevaluate_polling_on_wakeup = 0; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); if (!grpc_pollset_has_workers(pollset) && @@ -248,29 +250,41 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, locked = 0; goto done; } - if (!pollset->kicked_without_pollers) { - push_front_worker(pollset, worker); - added_worker = 1; - gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline, - now); - locked = 0; - gpr_tls_set(&g_current_thread_poller, 0); - gpr_tls_set(&g_current_thread_worker, 0); - } else { - pollset->kicked_without_pollers = 0; - } -done: - if (!locked) { - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); - locked = 1; + for (;;) { + if (!pollset->kicked_without_pollers) { + if (!added_worker) { + push_front_worker(pollset, worker); + added_worker = 1; + } + gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); + gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + deadline, now); + locked = 0; + gpr_tls_set(&g_current_thread_poller, 0); + gpr_tls_set(&g_current_thread_worker, 0); + } else { + pollset->kicked_without_pollers = 0; + } + done: + if (!locked) { + queued_work |= grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + locked = 1; + } + if (worker->reevaluate_polling_on_wakeup) { + worker->reevaluate_polling_on_wakeup = 0; + if (queued_work) { + deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + } + continue; + } + break; } - grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (added_worker) { remove_worker(pollset, worker); } + grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { grpc_pollset_kick(pollset, NULL); @@ -507,8 +521,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, pfd[2].fd = fd->fd; pfd[2].revents = 0; gpr_mu_unlock(&pollset->mu); - pfd[2].events = - (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); + pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, + POLLOUT, &fd_watcher); if (pfd[2].events != 0) { nfds++; } -- cgit v1.2.3 From 58d05a63dfb9355d707c3f7c4da781f4071efc60 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Oct 2015 13:59:31 -0700 Subject: Stabilize mac build --- src/core/iomgr/fd_posix.c | 241 ++++++++++++--------- src/core/iomgr/fd_posix.h | 14 +- .../iomgr/pollset_multipoller_with_poll_posix.c | 30 ++- src/core/iomgr/pollset_posix.c | 32 +-- 4 files changed, 173 insertions(+), 144 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 222e745c64..567e1ff02e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -44,11 +44,10 @@ #include #include #include +#include -enum descriptor_state { - NOT_READY = 0, - READY = 1 -}; /* or a pointer to a closure to call */ +#define CLOSURE_NOT_READY ((grpc_closure*)0) +#define CLOSURE_READY ((grpc_closure*)1) /* We need to keep a freelist not because of any concerns of malloc performance * but instead so that implementations with multiple threads in (for example) @@ -70,6 +69,8 @@ enum descriptor_state { static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; +static long gettid(void) { return syscall(__NR_gettid); } + static void freelist_fd(grpc_fd *fd) { gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -88,14 +89,15 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_unlock(&fd_freelist_mu); if (r == NULL) { r = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_init(&r->set_state_mu); - gpr_mu_init(&r->watcher_mu); + gpr_mu_init(&r->mu); + r->cap_ev = 0; + r->ev = NULL; } gpr_atm_rel_store(&r->refst, 1); - gpr_atm_rel_store(&r->readst, NOT_READY); - gpr_atm_rel_store(&r->writest, NOT_READY); - gpr_atm_rel_store(&r->shutdown, 0); + r->shutdown = 0; + r->read_closure = CLOSURE_NOT_READY; + r->write_closure = CLOSURE_NOT_READY; r->fd = fd; r->inactive_watcher_root.next = r->inactive_watcher_root.prev = &r->inactive_watcher_root; @@ -103,12 +105,13 @@ static grpc_fd *alloc_fd(int fd) { r->read_watcher = r->write_watcher = NULL; r->on_done_closure = NULL; r->closed = 0; + r->num_ev = 0; return r; } static void destroy(grpc_fd *fd) { - gpr_mu_destroy(&fd->set_state_mu); - gpr_mu_destroy(&fd->watcher_mu); + gpr_mu_destroy(&fd->mu); + gpr_free(fd->ev); gpr_free(fd); } @@ -169,43 +172,70 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return r; } +static int count_inactive(grpc_fd *fd) { + int n = 0; + grpc_fd_watcher *w; + for (w = fd->inactive_watcher_root.next; w != &fd->inactive_watcher_root; w = w->next) { + n++; + } + return n; +} + +static void fdev_add(fd_event_type type, grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *pollset_worker, grpc_fd_watcher *fd_watcher) { + fd_event *ev; + if (fd->num_ev == fd->cap_ev) { + fd->cap_ev = GPR_MAX(2 * fd->cap_ev, 32); + fd->ev = gpr_realloc(fd->ev, sizeof(*fd->ev) * fd->cap_ev); + } + ev = &fd->ev[fd->num_ev++]; + ev->thread = gettid(); + ev->type = type; + ev->pollset = pollset; + ev->pollset_worker = pollset_worker; + ev->watcher = fd_watcher; + ev->shutdown = fd->shutdown; + ev->closed = fd->closed; + ev->read_closure = fd->read_closure; + ev->write_closure = fd->write_closure; + ev->read_watcher = fd->read_watcher; + ev->write_watcher = fd->write_watcher; + ev->num_inactive = count_inactive(fd); +} + int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static void pollset_kick_locked(grpc_fd_watcher *watcher, fd_event_type type) { + fdev_add(type, watcher->fd, watcher->pollset, watcher->worker, watcher); gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + GPR_ASSERT(watcher->worker); grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + fdev_add(type + 1, watcher->fd, watcher->pollset, watcher->worker, watcher); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(fd->inactive_watcher_root.next); + pollset_kick_locked(fd->inactive_watcher_root.next, FDEV_KICK_INACTIVE); } else if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher); + pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER); } else if (fd->write_watcher) { - pollset_kick_locked(fd->write_watcher); + pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER); } } -static void maybe_wake_one_watcher(grpc_fd *fd) { - gpr_mu_lock(&fd->watcher_mu); - maybe_wake_one_watcher_locked(fd); - gpr_mu_unlock(&fd->watcher_mu); -} - static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(watcher); + pollset_kick_locked(watcher, FDEV_KICK_INACTIVE); } if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher); + pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(fd->write_watcher); + pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER); } } @@ -218,7 +248,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); - gpr_mu_lock(&fd->watcher_mu); + gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { fd->closed = 1; @@ -227,7 +257,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, } else { wake_all_watchers_locked(fd); } - gpr_mu_unlock(&fd->watcher_mu); + gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* drop the reference */ } @@ -247,125 +277,107 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st, +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { - switch (gpr_atm_acq_load(st)) { - case NOT_READY: - /* There is no race if the descriptor is already ready, so we skip - the interlocked op in that case. As long as the app doesn't - try to set the same upcall twice (which it shouldn't) then - oldval should never be anything other than READY or NOT_READY. We - don't - check for user error on the fast path. */ - if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) { - /* swap was successful -- the closure will run after the next - set_ready call. NOTE: we don't have an ABA problem here, - since we should never have concurrent calls to the same - notify_on function. */ - maybe_wake_one_watcher(fd); - return; - } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the READY code below */ - case READY: - GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); - gpr_atm_rel_store(st, NOT_READY); - grpc_exec_ctx_enqueue(exec_ctx, closure, - !gpr_atm_acq_load(&fd->shutdown)); - return; - default: /* WAITING */ - /* upcallptr was set to a different closure. This is an error! */ - gpr_log(GPR_ERROR, - "User called a notify_on function with a previous callback still " - "pending"); - abort(); + if (*st == CLOSURE_NOT_READY) { + *st = closure; + } else if (*st == CLOSURE_READY) { + *st = CLOSURE_NOT_READY; + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown); + maybe_wake_one_watcher_locked(fd); + } else { + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); } - gpr_log(GPR_ERROR, "Corrupt memory in &st->state"); - abort(); } -static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - gpr_atm *st) { - gpr_intptr state = gpr_atm_acq_load(st); - - switch (state) { - case READY: - /* duplicate ready, ignore */ - return; - case NOT_READY: - if (gpr_atm_rel_cas(st, NOT_READY, READY)) { - /* swap was successful -- the closure will run after the next - notify_on call. */ - return; - } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the WAITING code below */ - state = gpr_atm_acq_load(st); - default: /* waiting */ - GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && - gpr_atm_no_barrier_load(st) != NOT_READY); - grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state, - !gpr_atm_acq_load(&fd->shutdown)); - gpr_atm_rel_store(st, NOT_READY); - return; +/* returns 1 if state becomes not ready */ +static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st) { + if (*st == CLOSURE_READY) { + /* duplicate ready, ignore */ + return 0; + } else if (*st == CLOSURE_NOT_READY) { + *st = CLOSURE_READY; + return 0; + } else { + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown); + *st = CLOSURE_NOT_READY; + return 1; } } -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) { +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ - gpr_mu_lock(&fd->set_state_mu); + gpr_mu_lock(&fd->mu); set_ready_locked(exec_ctx, fd, st); - gpr_mu_unlock(&fd->set_state_mu); + gpr_mu_unlock(&fd->mu); } void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - gpr_mu_lock(&fd->set_state_mu); + gpr_mu_lock(&fd->mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); - gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(exec_ctx, fd, &fd->readst); - set_ready_locked(exec_ctx, fd, &fd->writest); - gpr_mu_unlock(&fd->set_state_mu); + fd->shutdown = 1; + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + gpr_mu_unlock(&fd->mu); } void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - notify_on(exec_ctx, fd, &fd->readst, closure); + gpr_mu_lock(&fd->mu); + fdev_add(FDEV_NOTIFY_ON_READ, fd, NULL, NULL, NULL); + notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); + gpr_mu_unlock(&fd->mu); } void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - notify_on(exec_ctx, fd, &fd->writest, closure); + gpr_mu_lock(&fd->mu); + fdev_add(FDEV_NOTIFY_ON_WRITE, fd, NULL, NULL, NULL); + notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); + gpr_mu_unlock(&fd->mu); } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *watcher) { gpr_uint32 mask = 0; + grpc_closure *cur; + int requested; /* keep track of pollers that have requested our events, in case they change */ GRPC_FD_REF(fd, "poll"); - gpr_mu_lock(&fd->watcher_mu); + gpr_mu_lock(&fd->mu); + fdev_add(FDEV_BEGIN_POLL, fd, pollset, worker, watcher); + /* if we are shutdown, then don't add to the watcher set */ if (gpr_atm_no_barrier_load(&fd->shutdown)) { watcher->fd = NULL; watcher->pollset = NULL; watcher->worker = NULL; - gpr_mu_unlock(&fd->watcher_mu); + gpr_mu_unlock(&fd->mu); GRPC_FD_UNREF(fd, "poll"); return 0; } + /* if there is nobody polling for read, but we need to, then start doing so */ - if (read_mask && !fd->read_watcher && - (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) { + cur = fd->read_closure; + requested = cur != CLOSURE_READY; + if (read_mask && fd->read_watcher == NULL && requested) { fd->read_watcher = watcher; mask |= read_mask; } /* if there is nobody polling for write, but we need to, then start doing so */ - if (write_mask && !fd->write_watcher && - (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) { + cur = fd->write_closure; + requested = cur != CLOSURE_READY; + if (write_mask && fd->write_watcher == NULL && requested) { fd->write_watcher = watcher; mask |= write_mask; } @@ -378,7 +390,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, watcher->pollset = pollset; watcher->worker = worker; watcher->fd = fd; - gpr_mu_unlock(&fd->watcher_mu); + gpr_mu_unlock(&fd->mu); return mask; } @@ -393,17 +405,24 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, return; } - gpr_mu_lock(&fd->watcher_mu); + gpr_mu_lock(&fd->mu); + + fdev_add(FDEV_END_POLL, watcher->fd, watcher->pollset, watcher->worker, watcher); + if (watcher == fd->read_watcher) { /* remove read watcher, kick if we still need a read */ was_polling = 1; - kick = kick || !got_read; + if (!got_read) { + kick = 1; + } fd->read_watcher = NULL; } if (watcher == fd->write_watcher) { /* remove write watcher, kick if we still need a write */ was_polling = 1; - kick = kick || !got_write; + if (!got_write) { + kick = 1; + } fd->write_watcher = NULL; } if (!was_polling && watcher->worker != NULL) { @@ -411,6 +430,16 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, watcher->next->prev = watcher->prev; watcher->prev->next = watcher->next; } + if (got_read) { + if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { + kick = 1; + } + } + if (got_write) { + if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { + kick = 1; + } + } if (kick) { maybe_wake_one_watcher_locked(fd); } @@ -419,17 +448,17 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, close(fd->fd); grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); } - gpr_mu_unlock(&fd->watcher_mu); + gpr_mu_unlock(&fd->mu); GRPC_FD_UNREF(fd, "poll"); } void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->readst); + set_ready(exec_ctx, fd, &fd->read_closure); } void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->writest); + set_ready(exec_ctx, fd, &fd->write_closure); } #endif diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index a60aff2a09..b56931cb2b 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -59,8 +59,8 @@ struct grpc_fd { and just unref by 1 when we're ready to flag the object as orphaned */ gpr_atm refst; - gpr_mu set_state_mu; - gpr_atm shutdown; + gpr_mu mu; + int shutdown; int closed; /* The watcher list. @@ -85,20 +85,22 @@ struct grpc_fd { If at a later time there becomes need of a poller to poll, one of the inactive pollers may be kicked out of their poll loops to take that responsibility. */ - gpr_mu watcher_mu; grpc_fd_watcher inactive_watcher_root; grpc_fd_watcher *read_watcher; grpc_fd_watcher *write_watcher; - gpr_atm readst; - gpr_atm writest; + grpc_closure *read_closure; + grpc_closure *write_closure; struct grpc_fd *freelist_next; grpc_closure *on_done_closure; - grpc_closure *shutdown_closures[2]; grpc_iomgr_object iomgr_object; + + size_t num_ev; + size_t cap_ev; + fd_event *ev; }; /* Create a wrapped file descriptor. diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 79ff26dc04..ba0ba72816 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -102,6 +102,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_exec_ctx *exec_ctx, static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + int timeout; int r; size_t i, j, fd_count; @@ -157,34 +160,29 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - for (i = 2; i < pfd_count; i++) { - grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, - pfds[i].revents & POLLOUT); - } - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + for (i = 2; i < pfd_count; i++) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); } } else if (r == 0) { - /* do nothing */ + for (i = 2; i < pfd_count; i++) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } } else { - if (pfds[0].revents & POLLIN) { + if (pfds[0].revents & POLLIN_CHECK) { grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } - if (pfds[1].revents & POLLIN) { + if (pfds[1].revents & POLLIN_CHECK) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); continue; } - if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(exec_ctx, watchers[i].fd); - } - if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(exec_ctx, watchers[i].fd); - } + grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, + pfds[i].revents & POLLOUT_CHECK); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b26f03d0c7..30750d95fc 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -274,6 +274,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (worker->reevaluate_polling_on_wakeup) { worker->reevaluate_polling_on_wakeup = 0; + pollset->kicked_without_pollers = 0; if (queued_work) { deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); } @@ -497,6 +498,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + struct pollfd pfd[3]; grpc_fd *fd; grpc_fd_watcher fd_watcher; @@ -539,31 +543,27 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); - if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN, - pfd[2].revents & POLLOUT); - } - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); } } else if (r == 0) { - /* do nothing */ + if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + } } else { - if (pfd[0].revents & POLLIN) { + if (pfd[0].revents & POLLIN_CHECK) { grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } - if (pfd[1].revents & POLLIN) { + if (pfd[1].revents & POLLIN_CHECK) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } if (nfds > 2) { - if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(exec_ctx, fd); - } - if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(exec_ctx, fd); - } + grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, + pfd[2].revents & POLLOUT_CHECK); + } else if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); } } } -- cgit v1.2.3 From 57f79d641e70c07e7bbff518c8ad5189d3770d3c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Oct 2015 14:00:12 -0700 Subject: clang-format --- src/core/iomgr/fd_posix.c | 23 +++++++++++++--------- src/core/iomgr/fd_posix.h | 5 ++--- .../iomgr/pollset_multipoller_with_poll_posix.c | 2 +- src/core/iomgr/pollset_posix.c | 13 +++++++----- src/core/iomgr/pollset_posix.h | 3 ++- 5 files changed, 27 insertions(+), 19 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 567e1ff02e..e086a71e12 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,8 +46,8 @@ #include #include -#define CLOSURE_NOT_READY ((grpc_closure*)0) -#define CLOSURE_READY ((grpc_closure*)1) +#define CLOSURE_NOT_READY ((grpc_closure *)0) +#define CLOSURE_READY ((grpc_closure *)1) /* We need to keep a freelist not because of any concerns of malloc performance * but instead so that implementations with multiple threads in (for example) @@ -175,13 +175,16 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { static int count_inactive(grpc_fd *fd) { int n = 0; grpc_fd_watcher *w; - for (w = fd->inactive_watcher_root.next; w != &fd->inactive_watcher_root; w = w->next) { + for (w = fd->inactive_watcher_root.next; w != &fd->inactive_watcher_root; + w = w->next) { n++; } return n; } -static void fdev_add(fd_event_type type, grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *pollset_worker, grpc_fd_watcher *fd_watcher) { +static void fdev_add(fd_event_type type, grpc_fd *fd, grpc_pollset *pollset, + grpc_pollset_worker *pollset_worker, + grpc_fd_watcher *fd_watcher) { fd_event *ev; if (fd->num_ev == fd->cap_ev) { fd->cap_ev = GPR_MAX(2 * fd->cap_ev, 32); @@ -210,7 +213,8 @@ static void pollset_kick_locked(grpc_fd_watcher *watcher, fd_event_type type) { fdev_add(type, watcher->fd, watcher->pollset, watcher->worker, watcher); gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); GPR_ASSERT(watcher->worker); - grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_pollset_kick_ex(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); fdev_add(type + 1, watcher->fd, watcher->pollset, watcher->worker, watcher); } @@ -277,8 +281,8 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, - grpc_closure *closure) { +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { *st = closure; } else if (*st == CLOSURE_READY) { @@ -296,7 +300,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure /* returns 1 if state becomes not ready */ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st) { + grpc_closure **st) { if (*st == CLOSURE_READY) { /* duplicate ready, ignore */ return 0; @@ -407,7 +411,8 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, gpr_mu_lock(&fd->mu); - fdev_add(FDEV_END_POLL, watcher->fd, watcher->pollset, watcher->worker, watcher); + fdev_add(FDEV_END_POLL, watcher->fd, watcher->pollset, watcher->worker, + watcher); if (watcher == fd->read_watcher) { /* remove read watcher, kick if we still need a read */ diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index b56931cb2b..8e6f2cfb52 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -129,9 +129,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, fd's current interest (such as epoll) do not need to call this function. MUST NOT be called with a pollset lock taken */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_uint32 read_mask, gpr_uint32 write_mask, - grpc_fd_watcher *rec); + grpc_pollset_worker *worker, gpr_uint32 read_mask, + gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll MUST NOT be called with a pollset lock taken */ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index ba0ba72816..faa6c14491 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -103,7 +103,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) int timeout; int r; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 5b7f7cce9a..574ebc7c60 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -98,7 +98,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) { +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, + gpr_uint32 flags) { /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { @@ -128,11 +129,13 @@ void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { - if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + if (gpr_tls_get(&g_current_thread_worker) == + (gpr_intptr)specific_worker) { push_back_worker(p, specific_worker); specific_worker = pop_front_worker(p); - if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && - gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == + (gpr_intptr)specific_worker) { push_back_worker(p, specific_worker); return; } @@ -497,7 +500,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, gpr_timespec deadline, gpr_timespec now) { #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) struct pollfd pfd[3]; grpc_fd *fd; diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 762582c79d..d375207643 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -114,7 +114,8 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_CAN_KICK_SELF 1 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 -void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags); +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, + gpr_uint32 flags); /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, -- cgit v1.2.3 From 1270b2b36b8c40aec10cdae72a849ec4c6f3ce48 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Oct 2015 16:12:25 -0700 Subject: Debug cleanup --- src/core/iomgr/fd_posix.c | 62 ++++++----------------------------------------- src/core/iomgr/fd_posix.h | 4 --- 2 files changed, 7 insertions(+), 59 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index e086a71e12..dc4aabb668 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -44,7 +44,6 @@ #include #include #include -#include #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -69,8 +68,6 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; -static long gettid(void) { return syscall(__NR_gettid); } - static void freelist_fd(grpc_fd *fd) { gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -90,8 +87,6 @@ static grpc_fd *alloc_fd(int fd) { if (r == NULL) { r = gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&r->mu); - r->cap_ev = 0; - r->ev = NULL; } gpr_atm_rel_store(&r->refst, 1); @@ -105,13 +100,11 @@ static grpc_fd *alloc_fd(int fd) { r->read_watcher = r->write_watcher = NULL; r->on_done_closure = NULL; r->closed = 0; - r->num_ev = 0; return r; } static void destroy(grpc_fd *fd) { gpr_mu_destroy(&fd->mu); - gpr_free(fd->ev); gpr_free(fd); } @@ -172,60 +165,25 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return r; } -static int count_inactive(grpc_fd *fd) { - int n = 0; - grpc_fd_watcher *w; - for (w = fd->inactive_watcher_root.next; w != &fd->inactive_watcher_root; - w = w->next) { - n++; - } - return n; -} - -static void fdev_add(fd_event_type type, grpc_fd *fd, grpc_pollset *pollset, - grpc_pollset_worker *pollset_worker, - grpc_fd_watcher *fd_watcher) { - fd_event *ev; - if (fd->num_ev == fd->cap_ev) { - fd->cap_ev = GPR_MAX(2 * fd->cap_ev, 32); - fd->ev = gpr_realloc(fd->ev, sizeof(*fd->ev) * fd->cap_ev); - } - ev = &fd->ev[fd->num_ev++]; - ev->thread = gettid(); - ev->type = type; - ev->pollset = pollset; - ev->pollset_worker = pollset_worker; - ev->watcher = fd_watcher; - ev->shutdown = fd->shutdown; - ev->closed = fd->closed; - ev->read_closure = fd->read_closure; - ev->write_closure = fd->write_closure; - ev->read_watcher = fd->read_watcher; - ev->write_watcher = fd->write_watcher; - ev->num_inactive = count_inactive(fd); -} - int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher, fd_event_type type) { - fdev_add(type, watcher->fd, watcher->pollset, watcher->worker, watcher); +static void pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); - fdev_add(type + 1, watcher->fd, watcher->pollset, watcher->worker, watcher); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(fd->inactive_watcher_root.next, FDEV_KICK_INACTIVE); + pollset_kick_locked(fd->inactive_watcher_root.next); } else if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER); + pollset_kick_locked(fd->read_watcher); } else if (fd->write_watcher) { - pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER); + pollset_kick_locked(fd->write_watcher); } } @@ -233,13 +191,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(watcher, FDEV_KICK_INACTIVE); + pollset_kick_locked(watcher); } if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher, FDEV_KICK_READER); + pollset_kick_locked(fd->read_watcher); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(fd->write_watcher, FDEV_KICK_WRITER); + pollset_kick_locked(fd->write_watcher); } } @@ -334,7 +292,6 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); - fdev_add(FDEV_NOTIFY_ON_READ, fd, NULL, NULL, NULL); notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); gpr_mu_unlock(&fd->mu); } @@ -342,7 +299,6 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); - fdev_add(FDEV_NOTIFY_ON_WRITE, fd, NULL, NULL, NULL); notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); gpr_mu_unlock(&fd->mu); } @@ -358,7 +314,6 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, GRPC_FD_REF(fd, "poll"); gpr_mu_lock(&fd->mu); - fdev_add(FDEV_BEGIN_POLL, fd, pollset, worker, watcher); /* if we are shutdown, then don't add to the watcher set */ if (gpr_atm_no_barrier_load(&fd->shutdown)) { @@ -411,9 +366,6 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, gpr_mu_lock(&fd->mu); - fdev_add(FDEV_END_POLL, watcher->fd, watcher->pollset, watcher->worker, - watcher); - if (watcher == fd->read_watcher) { /* remove read watcher, kick if we still need a read */ was_polling = 1; diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 8e6f2cfb52..ca3d065abc 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -97,10 +97,6 @@ struct grpc_fd { grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; - - size_t num_ev; - size_t cap_ev; - fd_event *ev; }; /* Create a wrapped file descriptor. -- cgit v1.2.3 From 548735efb8d27b61db8eae3752b4fffb1ae4bcd9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Oct 2015 16:17:10 -0700 Subject: Commentary --- src/core/iomgr/fd_posix.h | 4 +++- src/core/iomgr/pollset_posix.h | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index ca3d065abc..b85c74b52b 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -128,7 +128,9 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll - MUST NOT be called with a pollset lock taken */ + MUST NOT be called with a pollset lock taken + if got_read or got_write are 1, also does the notify_on_{read,write} as + appropriate. */ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, int got_read, int got_write); diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index d375207643..f0ec07ebce 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -112,8 +112,12 @@ void grpc_kick_drain(grpc_pollset *p); int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); +/* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 +/* Force the wakee to repoll when awoken */ #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +/* As per grpc_pollset_kick, with an extended set of flags (defined above) + -- mostly for fd_posix's use. */ void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags); -- cgit v1.2.3 From ce04de0671435d50d82a50ec1c66985596959a52 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 2 Oct 2015 16:17:49 -0700 Subject: Commentary --- src/core/iomgr/fd_posix.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index b85c74b52b..dc917ebbc0 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -129,7 +129,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll MUST NOT be called with a pollset lock taken - if got_read or got_write are 1, also does the notify_on_{read,write} as + if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, int got_read, int got_write); -- cgit v1.2.3 From 3ff551bf70e9cfbcc19aa4781b0c33719aa63538 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 09:11:37 -0700 Subject: Add a simpler (but slower) path for closures for where it makes sense --- src/core/iomgr/closure.c | 24 ++++++++++++++++++++++++ src/core/iomgr/closure.h | 3 +++ 2 files changed, 27 insertions(+) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index 3265425789..d91681990f 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -33,6 +33,8 @@ #include "src/core/iomgr/closure.h" +#include + void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; @@ -69,3 +71,25 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { } src->head = src->tail = NULL; } + +typedef struct { + grpc_iomgr_cb_func cb; + void *cb_arg; + grpc_closure wrapper; +} wrapped_closure; + +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) { + wrapped_closure *wc = arg; + grpc_iomgr_cb_func cb = wc->cb; + void *cb_arg = wc->cb_arg; + gpr_free(wc); + cb(exec_ctx, cb_arg, success); +} + +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { + wrapped_closure *wc = gpr_malloc(sizeof(*wc)); + wc->cb = cb; + wc->cb_arg = cb_arg; + grpc_closure_init(&wc->wrapper, closure_wrapper, wc); + return &wc->wrapper; +} diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 982ffa4e1b..d812659af0 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -77,6 +77,9 @@ struct grpc_closure { void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg); +/* Create a heap allocated closure: try to avoid except for very rare events */ +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); + #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } -- cgit v1.2.3 From faf8f684b1d2bd453a9bfc42a626795e39836c7f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 09:12:34 -0700 Subject: Close incoming connections if we are post shutdown --- src/core/iomgr/tcp_server_windows.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index db3319b3c6..3fea8b5b35 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -336,6 +336,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); + } else { + closesocket(sock); } } -- cgit v1.2.3 From b937aa1b7a62142c9decf20b8c0a5be3ef20ec0b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 11:30:30 -0700 Subject: ex --> ext --- src/core/iomgr/fd_posix.c | 4 ++-- src/core/iomgr/pollset_posix.c | 6 +++--- src/core/iomgr/pollset_posix.h | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index dc4aabb668..7ae8d6efa5 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -172,8 +172,8 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { static void pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); GPR_ASSERT(watcher->worker); - grpc_pollset_kick_ex(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 574ebc7c60..0851a0dc70 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -98,8 +98,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, - gpr_uint32 flags) { +void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, + gpr_uint32 flags) { /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { @@ -151,7 +151,7 @@ void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { - grpc_pollset_kick_ex(p, specific_worker, 0); + grpc_pollset_kick_ext(p, specific_worker, 0); } /* global state management */ diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index f0ec07ebce..6b91725de7 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -118,8 +118,8 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per grpc_pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, - gpr_uint32 flags); +void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, + gpr_uint32 flags); /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, -- cgit v1.2.3 From d0a00003d6123a45842ac6df842abcbdb6ed612f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 11:30:37 -0700 Subject: clang-format --- src/core/iomgr/pollset_posix.c | 3 ++- src/core/iomgr/pollset_posix.h | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 0851a0dc70..7581c3593e 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -98,7 +98,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, +void grpc_pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, gpr_uint32 flags) { /* pollset->mu already held */ if (specific_worker != NULL) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 6b91725de7..34f76db2af 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -118,7 +118,8 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per grpc_pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, +void grpc_pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, gpr_uint32 flags); /* turn a pollset into a multipoller: platform specific */ -- cgit v1.2.3 From d49e3a1e6bc8f269257d7879f132bda1bd36b0a4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 11:33:14 -0700 Subject: Add commentary --- src/core/iomgr/fd_posix.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7ae8d6efa5..231bc988a8 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -242,8 +242,10 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { + /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { + /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown); maybe_wake_one_watcher_locked(fd); @@ -260,12 +262,14 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { if (*st == CLOSURE_READY) { - /* duplicate ready, ignore */ + /* duplicate ready ==> ignore */ return 0; } else if (*st == CLOSURE_NOT_READY) { + /* not ready, and not waiting ==> flag ready */ *st = CLOSURE_READY; return 0; } else { + /* waiting ==> queue closure */ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown); *st = CLOSURE_NOT_READY; return 1; -- cgit v1.2.3 From 6078a7d36728e4711eee6897c033c3ad60acba49 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 6 Oct 2015 11:50:21 -0700 Subject: Add commentary --- src/core/iomgr/pollset_posix.c | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 7581c3593e..08e34c5d56 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -228,31 +228,44 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int added_worker = 0; int locked = 1; int queued_work = 0; + int keep_polling = 0; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; worker->reevaluate_polling_on_wakeup = 0; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); + /* If there's work waiting for the pollset to be idle, and the + pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); goto done; } + /* Check alarms - these are a global resource so we just ping + each time through on every pollset. + May update deadline to ensure timely wakeups. + TODO(ctiller): can this work be localized? */ if (grpc_alarm_check(exec_ctx, now, &deadline)) { gpr_mu_unlock(&pollset->mu); locked = 0; goto done; } + /* If we're shutting down then we don't execute any extended work */ if (pollset->shutting_down) { goto done; } + /* Give do_promote priority so we don't starve it out */ if (pollset->in_flight_cbs) { - /* Give do_promote priority so we don't starve it out */ gpr_mu_unlock(&pollset->mu); locked = 0; goto done; } - for (;;) { + /* Start polling, and keep doing so while we're being asked to + re-evaluate our pollers (this allows poll() based pollers to + ensure they don't miss wakeups) */ + keep_polling = 1; + while (keep_polling) { + keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { push_front_worker(pollset, worker); @@ -268,21 +281,29 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } else { pollset->kicked_without_pollers = 0; } + /* Finished execution - start cleaning up. + Note that we may arrive here from outside the enclosing while() loop. + In that case we won't loop though as we haven't added worker to the + worker list, which means nobody could ask us to re-evaluate polling). */ done: if (!locked) { queued_work |= grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); locked = 1; } + /* If we're forced to re-evaluate polling (via grpc_pollset_kick with + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force + a loop */ if (worker->reevaluate_polling_on_wakeup) { worker->reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work) { + /* If there's queued work on the list, then set the deadline to be + immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); } - continue; + keep_polling = 1; } - break; } if (added_worker) { remove_worker(pollset, worker); -- cgit v1.2.3 From c6787b263fbf31510fce389304d9a11da0d8aa2f Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Wed, 7 Oct 2015 10:13:53 -0400 Subject: Fix broken udp_server_test, by adding exec_ctx to read_cb --- src/core/iomgr/udp_server.c | 2 +- src/core/iomgr/udp_server.h | 3 ++- test/core/iomgr/udp_server_test.c | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 1304f2067e..9903e970e6 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -278,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->emfd, sp->server->grpc_server); + sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index dbbe097109..de5736c426 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -43,7 +43,8 @@ typedef struct grpc_server grpc_server; typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(grpc_fd *emfd, grpc_server *server); +typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, + grpc_server *server); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index fc0026da4d..86e8767937 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -49,7 +49,8 @@ static grpc_pollset g_pollset; static int g_number_of_reads = 0; static int g_number_of_bytes_read = 0; -static void on_read(grpc_fd *emfd, grpc_server *server) { +static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, + grpc_server *server) { char read_buffer[512]; ssize_t byte_count; -- cgit v1.2.3