diff options
-rw-r--r-- | src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 130 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 29 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 5 |
5 files changed, 151 insertions, 23 deletions
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 306d312dc4..77a67d2007 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -126,6 +126,9 @@ struct grpc_fd { grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; + + /* The pollset that last noticed and notified that the fd is readable */ + grpc_pollset *read_notifier_pollset; }; /* Begin polling on an fd. @@ -147,7 +150,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, - int got_read, int got_write); + int got_read, int got_write, + grpc_pollset *read_notifier_pollset); /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd *fd); @@ -342,6 +346,7 @@ static grpc_fd *alloc_fd(int fd) { r->on_done_closure = NULL; r->closed = 0; r->released = 0; + r->read_notifier_pollset = NULL; gpr_mu_unlock(&r->mu); return r; } @@ -511,9 +516,17 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, "\t>> notify_on_locked: (fd:%d) CLOSURE_NOT_READY -> %p", + fd->fd, closure); /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> notify_on_locked: (fd:%d) CLOSURE_READY -> CLOSURE_NOT_READY " + "(enqueue: %p)", + fd->fd, closure); /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); @@ -532,19 +545,41 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { if (*st == CLOSURE_READY) { /* duplicate ready ==> ignore */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) CLOSURE_READY -> CLOSURE_READY (no " + "change)", + fd->fd); return 0; } else if (*st == CLOSURE_NOT_READY) { /* not ready, and not waiting ==> flag ready */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) CLOSURE_NOT_READY -> CLOSURE_READY", + fd->fd); *st = CLOSURE_READY; return 0; } else { /* waiting ==> queue closure */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) Enqueue %p -> CLOSURE_NOT_READY", + fd->fd, *st); grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); *st = CLOSURE_NOT_READY; return 1; } } +static void set_read_notifier_pollset_locked( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { + /* TODO(sreek): Remove the following log line */ + gpr_log(GPR_INFO, "\t>> Set read notifier (fd:%d): %p --> %p", fd->fd, + fd->read_notifier_pollset, read_notifier_pollset); + + fd->read_notifier_pollset = read_notifier_pollset; +} + static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -568,6 +603,18 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +/* Return the read-notifier pollset */ +static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { + grpc_pollset *notifier = NULL; + + gpr_mu_lock(&fd->mu); + notifier = fd->read_notifier_pollset; + gpr_mu_unlock(&fd->mu); + + return notifier; +} + static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *worker, uint32_t read_mask, uint32_t write_mask, grpc_fd_watcher *watcher) { @@ -620,7 +667,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, } static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, - int got_read, int got_write) { + int got_read, int got_write, + grpc_pollset *read_notifier_pollset) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -653,11 +701,27 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, watcher->prev->next = watcher->next; } if (got_read) { + /*TODO(sreek): Delete this log line */ + gpr_log(GPR_INFO, + "\t>> fd_end_poll(): GOT READ Calling set_ready_locked. fd: %d, " + "fd->read_closure: %p, " + "notifier_pollset: %p", + fd->fd, fd->read_closure, read_notifier_pollset); + if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { kick = 1; } + + if (read_notifier_pollset != NULL) { + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } } if (got_write) { + /*TODO(sreek): Delete this log line */ + gpr_log(GPR_INFO, + "\t>> fd_end_poll(): GOT WRITE set_ready_locked. fd: %d, " + "fd->write_closure: %p", + fd->fd, fd->write_closure); if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { kick = 1; } @@ -1208,11 +1272,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); } if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } else if (r == 0) { if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } else { if (pfd[0].revents & POLLIN_CHECK) { @@ -1222,10 +1286,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } if (nfds > 2) { + /* TODO(sreek): delete the following comment line */ + gpr_log( + GPR_INFO, + "\t>> basic_pollset_maybe_work_and_unlock(): fd->fd: %d, pollset: %p " + "is readable (calling fd_end_poll()) -------------------------------", + pfd[2].fd, pollset); fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK); + pfd[2].revents & POLLOUT_CHECK, pollset); } else if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } @@ -1361,11 +1431,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); } for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else if (r == 0) { for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -1376,11 +1446,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); continue; } + /*TODO(sree) - Delete this log line*/ + gpr_log(GPR_INFO, + "multipoll_with_poll_pollset(). fd: %d became redable. Pollset: " + "%p (calling fd_end_poll())*************", + pfds[i].fd, pollset); fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); + pfds[i].revents & POLLOUT_CHECK, pollset); } } @@ -1456,20 +1531,31 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, + grpc_pollset *read_notifier_pollset) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ gpr_mu_lock(&fd->mu); set_ready_locked(exec_ctx, fd, st); + + /* A non-NULL read_notifier_pollset means that the fd is readable. */ + if (read_notifier_pollset != NULL) { + /* Note: Since the fd might be a part of multiple pollsets, this might be + * called multiple times (for each time the fd becomes readable) and it is + * okay to set the fd's read-notifier pollset to anyone of these pollsets */ + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } + gpr_mu_unlock(&fd->mu); } -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->read_closure); +static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_pollset *notifier_pollset) { + set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure); + set_ready(exec_ctx, fd, &fd->write_closure, NULL); } struct epoll_fd_list { @@ -1561,7 +1647,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } } - fd_end_poll(exec_ctx, &watcher, 0, 0); + fd_end_poll(exec_ctx, &watcher, 0, 0, NULL); } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, @@ -1675,9 +1761,20 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } else { if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd); + /* TODO(sreek): Delete this once the issue #5470 is resolved */ + gpr_log( + GPR_INFO, + "\t>> multipoll_with_epoll_pollset: Calling " + "fd_become_readable(fd->fd: %d, pollset: %p) ++++++++++++", + fd->fd, pollset); + fd_become_readable(exec_ctx, fd, pollset); } if (write_ev || cancel) { + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, + "\t>> multipoll_with_epoll_pollset: Calling " + "fd_become_writable(fd: %d)", + fd->fd); fd_become_writable(exec_ctx, fd); } } @@ -1904,6 +2001,7 @@ static const grpc_event_engine_vtable vtable = { .fd_shutdown = fd_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, + .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 0eb95a2e09..af4126c900 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -83,6 +83,10 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, g_event_engine->fd_notify_on_write(exec_ctx, fd, closure); } +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd); +} + size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..4cfa83e6a2 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -55,6 +55,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -137,6 +139,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); +/* Return the read notifier pollset from the fd */ +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); + /* pollset_posix functions */ /* Add an fd to a pollset */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index cfb5251684..03318151cc 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -310,13 +310,20 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; + grpc_pollset *read_notifier_pollset = NULL; grpc_fd *fdobj; - size_t i; if (!success) { goto error; } + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Getting read notifier"); + read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd); + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Got read notifier: %p", + read_notifier_pollset); + /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { struct sockaddr_storage addr; @@ -349,12 +356,22 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } fdobj = grpc_fd_create(fd, name); - /* TODO(ctiller): revise this when we have server-side sharding - of channels -- we certainly should not be automatically adding every - incoming channel to every pollset owned by the server */ - for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj); + + if (read_notifier_pollset == NULL) { + /* TODO(sreek): Check when this would happen - Ideally this should not + * happen. Remove the next log-line once this is resolved */ + gpr_log(GPR_INFO, "\t** *******!!! tcp_server_posix.on_read(): " + "read_notifier_pollset is NULL. !!!**********************"); + + gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); + goto error; } + + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Adding fd %d *only* to pollset %p", + fd, read_notifier_pollset); + grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); + sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index ad8ee8c7a9..25b6886f24 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1018,7 +1018,6 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, const grpc_channel_args *args) { - size_t i; size_t num_registered_methods; size_t alloc; registered_method *rm; @@ -1033,11 +1032,15 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; + /* TODO(sreek): Delete this commented block once issue #5470 is resolved */ + /* + size_t i; for (i = 0; i < s->cq_count; i++) { memset(&op, 0, sizeof(op)); op.bind_pollset = grpc_cq_pollset(s->cqs[i]); grpc_transport_perform_op(exec_ctx, transport, &op); } + */ channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); |