diff options
author | 2016-05-24 06:55:28 -0700 | |
---|---|---|
committer | 2016-05-24 06:55:28 -0700 | |
commit | 48abdde19722a9d24d4f590b9d197db29f498ed1 (patch) | |
tree | 97f138ce898ff271d2ee4606996c446fb3648daa /src/core/lib/iomgr/ev_poll_posix.c | |
parent | 77758f5cb9f913a891be231f24eaa8306d5047b3 (diff) | |
parent | 6db0232c3abe800a302d5cc889e4cadf060e221c (diff) |
Merge github.com:grpc/grpc into error
Diffstat (limited to 'src/core/lib/iomgr/ev_poll_posix.c')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 82 |
1 files changed, 39 insertions, 43 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index b964b88367..52efb0d6b2 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -59,6 +59,8 @@ * FD declarations */ +grpc_wakeup_fd grpc_global_wakeup_fd; + typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -113,6 +115,9 @@ struct grpc_fd { grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; + + /* The pollset that last noticed and notified that the fd is readable */ + grpc_pollset *read_notifier_pollset; }; /* Begin polling on an fd. @@ -134,7 +139,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, - int got_read, int got_write); + int got_read, int got_write, + grpc_pollset *read_notifier_pollset); /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd *fd); @@ -177,7 +183,6 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; grpc_pollset_worker root_worker; - int in_flight_cbs; int shutting_down; int called_shutdown; int kicked_without_pollers; @@ -187,10 +192,6 @@ struct grpc_pollset { size_t fd_count; size_t fd_capacity; grpc_fd **fds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; }; @@ -301,6 +302,7 @@ static grpc_fd *fd_create(int fd, const char *name) { r->on_done_closure = NULL; r->closed = 0; r->released = 0; + r->read_notifier_pollset = NULL; char *name2; gpr_asprintf(&name2, "%s fd=%d", name, fd); @@ -316,6 +318,18 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } +/* Return the read-notifier pollset */ +static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { + grpc_pollset *notifier = NULL; + + gpr_mu_lock(&fd->mu); + notifier = fd->read_notifier_pollset; + gpr_mu_unlock(&fd->mu); + + return notifier; +} + static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); @@ -454,6 +468,11 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } } +static void set_read_notifier_pollset_locked( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { + fd->read_notifier_pollset = read_notifier_pollset; +} + static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -529,7 +548,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, } static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, - int got_read, int got_write) { + int got_read, int got_write, + grpc_pollset *read_notifier_pollset) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -565,6 +585,9 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { kick = 1; } + if (read_notifier_pollset != NULL) { + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } } if (got_write) { if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { @@ -726,7 +749,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; @@ -735,14 +757,10 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->kicked_without_pollers = 0; pollset->fd_count = 0; pollset->fd_capacity = 0; - pollset->del_count = 0; - pollset->del_capacity = 0; pollset->fds = NULL; - pollset->dels = NULL; } static void pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); while (pollset->local_wakeup_cache) { @@ -752,17 +770,14 @@ static void pollset_destroy(grpc_pollset *pollset) { pollset->local_wakeup_cache = next; } gpr_free(pollset->fds); - gpr_free(pollset->dels); gpr_mu_destroy(&pollset->mu); } static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); GPR_ASSERT(pollset->fd_count == 0); - GPR_ASSERT(pollset->del_count == 0); pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; @@ -795,11 +810,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { for (i = 0; i < pollset->fd_count; i++) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } - for (i = 0; i < pollset->del_count; i++) { - GRPC_FD_UNREF(pollset->dels[i], "multipoller_del"); - } pollset->fd_count = 0; - pollset->del_count = 0; grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } @@ -851,13 +862,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_MARK("pollset_work.shutting_down", 0); goto done; } - /* Give do_promote priority so we don't starve it out */ - if (pollset->in_flight_cbs) { - GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(&pollset->mu); - locked = 0; - goto done; - } /* Start polling, and keep doing so while we're being asked to re-evaluate our pollers (this allows poll() based pollers to ensure they don't miss wakeups) */ @@ -877,7 +881,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout; int r; - size_t i, j, fd_count; + size_t i, fd_count; nfds_t pfd_count; /* TODO(ctiller): inline some elements to avoid an allocation */ grpc_fd_watcher *watchers; @@ -897,11 +901,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pfds[1].events = POLLIN; pfds[1].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - int remove = fd_is_orphaned(pollset->fds[i]); - for (j = 0; !remove && j < pollset->del_count; j++) { - if (pollset->fds[i] == pollset->dels[j]) remove = 1; - } - if (remove) { + if (fd_is_orphaned(pollset->fds[i])) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -912,10 +912,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pfd_count++; } } - for (j = 0; j < pollset->del_count; j++) { - GRPC_FD_UNREF(pollset->dels[j], "multipoller_del"); - } - pollset->del_count = 0; pollset->fd_count = fd_count; gpr_mu_unlock(&pollset->mu); @@ -937,11 +933,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else if (r == 0) { for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -954,10 +950,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); + pfds[i].revents & POLLOUT_CHECK, pollset); } } } @@ -1009,7 +1005,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + } else if (!pollset->called_shutdown) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(exec_ctx, pollset); @@ -1040,8 +1036,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); } - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - !pollset_has_workers(pollset)) { + if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); } @@ -1222,6 +1217,7 @@ static const grpc_event_engine_vtable vtable = { .fd_shutdown = fd_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, + .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, |