aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c130
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h6
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c29
-rw-r--r--src/core/lib/surface/server.c5
5 files changed, 151 insertions, 23 deletions
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 306d312dc4..77a67d2007 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -126,6 +126,9 @@ struct grpc_fd {
grpc_closure *on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* The pollset that last noticed and notified that the fd is readable */
+ grpc_pollset *read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -147,7 +150,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write);
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
@@ -342,6 +346,7 @@ static grpc_fd *alloc_fd(int fd) {
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
+ r->read_notifier_pollset = NULL;
gpr_mu_unlock(&r->mu);
return r;
}
@@ -511,9 +516,17 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (*st == CLOSURE_NOT_READY) {
+ /* TODO (sreek): Remove following log line */
+ gpr_log(GPR_INFO, "\t>> notify_on_locked: (fd:%d) CLOSURE_NOT_READY -> %p",
+ fd->fd, closure);
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
+ /* TODO (sreek): Remove following log line */
+ gpr_log(GPR_INFO,
+ "\t>> notify_on_locked: (fd:%d) CLOSURE_READY -> CLOSURE_NOT_READY "
+ "(enqueue: %p)",
+ fd->fd, closure);
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
@@ -532,19 +545,41 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st) {
if (*st == CLOSURE_READY) {
/* duplicate ready ==> ignore */
+ /* TODO (sreek): Remove following log line */
+ gpr_log(GPR_INFO,
+ "\t>> set_ready_locked: (fd:%d) CLOSURE_READY -> CLOSURE_READY (no "
+ "change)",
+ fd->fd);
return 0;
} else if (*st == CLOSURE_NOT_READY) {
/* not ready, and not waiting ==> flag ready */
+ /* TODO (sreek): Remove following log line */
+ gpr_log(GPR_INFO,
+ "\t>> set_ready_locked: (fd:%d) CLOSURE_NOT_READY -> CLOSURE_READY",
+ fd->fd);
*st = CLOSURE_READY;
return 0;
} else {
/* waiting ==> queue closure */
+ /* TODO (sreek): Remove following log line */
+ gpr_log(GPR_INFO,
+ "\t>> set_ready_locked: (fd:%d) Enqueue %p -> CLOSURE_NOT_READY",
+ fd->fd, *st);
grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
}
+static void set_read_notifier_pollset_locked(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+ /* TODO(sreek): Remove the following log line */
+ gpr_log(GPR_INFO, "\t>> Set read notifier (fd:%d): %p --> %p", fd->fd,
+ fd->read_notifier_pollset, read_notifier_pollset);
+
+ fd->read_notifier_pollset = read_notifier_pollset;
+}
+
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
GPR_ASSERT(!fd->shutdown);
@@ -568,6 +603,18 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
+ grpc_pollset *notifier = NULL;
+
+ gpr_mu_lock(&fd->mu);
+ notifier = fd->read_notifier_pollset;
+ gpr_mu_unlock(&fd->mu);
+
+ return notifier;
+}
+
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *watcher) {
@@ -620,7 +667,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write) {
+ int got_read, int got_write,
+ grpc_pollset *read_notifier_pollset) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@@ -653,11 +701,27 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
watcher->prev->next = watcher->next;
}
if (got_read) {
+ /*TODO(sreek): Delete this log line */
+ gpr_log(GPR_INFO,
+ "\t>> fd_end_poll(): GOT READ Calling set_ready_locked. fd: %d, "
+ "fd->read_closure: %p, "
+ "notifier_pollset: %p",
+ fd->fd, fd->read_closure, read_notifier_pollset);
+
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
+
+ if (read_notifier_pollset != NULL) {
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
}
if (got_write) {
+ /*TODO(sreek): Delete this log line */
+ gpr_log(GPR_INFO,
+ "\t>> fd_end_poll(): GOT WRITE set_ready_locked. fd: %d, "
+ "fd->write_closure: %p",
+ fd->fd, fd->write_closure);
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
kick = 1;
}
@@ -1208,11 +1272,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else if (r == 0) {
if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
} else {
if (pfd[0].revents & POLLIN_CHECK) {
@@ -1222,10 +1286,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
}
if (nfds > 2) {
+ /* TODO(sreek): delete the following comment line */
+ gpr_log(
+ GPR_INFO,
+ "\t>> basic_pollset_maybe_work_and_unlock(): fd->fd: %d, pollset: %p "
+ "is readable (calling fd_end_poll()) -------------------------------",
+ pfd[2].fd, pollset);
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
- pfd[2].revents & POLLOUT_CHECK);
+ pfd[2].revents & POLLOUT_CHECK, pollset);
} else if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+ fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
}
}
@@ -1361,11 +1431,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -1376,11 +1446,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
continue;
}
+ /*TODO(sree) - Delete this log line*/
+ gpr_log(GPR_INFO,
+ "multipoll_with_poll_pollset(). fd: %d became redable. Pollset: "
+ "%p (calling fd_end_poll())*************",
+ pfds[i].fd, pollset);
fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK);
+ pfds[i].revents & POLLOUT_CHECK, pollset);
}
}
@@ -1456,20 +1531,31 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
+ grpc_pollset *read_notifier_pollset) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
gpr_mu_lock(&fd->mu);
set_ready_locked(exec_ctx, fd, st);
+
+ /* A non-NULL read_notifier_pollset means that the fd is readable. */
+ if (read_notifier_pollset != NULL) {
+ /* Note: Since the fd might be a part of multiple pollsets, this might be
+ * called multiple times (for each time the fd becomes readable) and it is
+ * okay to set the fd's read-notifier pollset to anyone of these pollsets */
+ set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+ }
+
gpr_mu_unlock(&fd->mu);
}
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->read_closure);
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_pollset *notifier_pollset) {
+ set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->write_closure);
+ set_ready(exec_ctx, fd, &fd->write_closure, NULL);
}
struct epoll_fd_list {
@@ -1561,7 +1647,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
}
- fd_end_poll(exec_ctx, &watcher, 0, 0);
+ fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1675,9 +1761,20 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
+ /* TODO(sreek): Delete this once the issue #5470 is resolved */
+ gpr_log(
+ GPR_INFO,
+ "\t>> multipoll_with_epoll_pollset: Calling "
+ "fd_become_readable(fd->fd: %d, pollset: %p) ++++++++++++",
+ fd->fd, pollset);
+ fd_become_readable(exec_ctx, fd, pollset);
}
if (write_ev || cancel) {
+ /* TODO(sreek): Delete the following log line */
+ gpr_log(GPR_INFO,
+ "\t>> multipoll_with_epoll_pollset: Calling "
+ "fd_become_writable(fd: %d)",
+ fd->fd);
fd_become_writable(exec_ctx, fd);
}
}
@@ -1904,6 +2001,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_shutdown = fd_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
+ .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 0eb95a2e09..af4126c900 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -83,6 +83,10 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
g_event_engine->fd_notify_on_write(exec_ctx, fd, closure);
}
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd);
+}
+
size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1fa9f5ef2d..4cfa83e6a2 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -55,6 +55,8 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+ grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -137,6 +139,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+/* Return the read notifier pollset from the fd */
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd);
+
/* pollset_posix functions */
/* Add an fd to a pollset */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index cfb5251684..03318151cc 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -310,13 +310,20 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
+ grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
- size_t i;
if (!success) {
goto error;
}
+ /* TODO(sreek): Delete the following log line */
+ gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Getting read notifier");
+ read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
+ /* TODO(sreek): Delete the following log line */
+ gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Got read notifier: %p",
+ read_notifier_pollset);
+
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
struct sockaddr_storage addr;
@@ -349,12 +356,22 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
fdobj = grpc_fd_create(fd, name);
- /* TODO(ctiller): revise this when we have server-side sharding
- of channels -- we certainly should not be automatically adding every
- incoming channel to every pollset owned by the server */
- for (i = 0; i < sp->server->pollset_count; i++) {
- grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj);
+
+ if (read_notifier_pollset == NULL) {
+ /* TODO(sreek): Check when this would happen - Ideally this should not
+ * happen. Remove the next log-line once this is resolved */
+ gpr_log(GPR_INFO, "\t** *******!!! tcp_server_posix.on_read(): "
+ "read_notifier_pollset is NULL. !!!**********************");
+
+ gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
+ goto error;
}
+
+ /* TODO(sreek): Delete the following log line */
+ gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Adding fd %d *only* to pollset %p",
+ fd, read_notifier_pollset);
+ grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index ad8ee8c7a9..25b6886f24 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1018,7 +1018,6 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
const grpc_channel_args *args) {
- size_t i;
size_t num_registered_methods;
size_t alloc;
registered_method *rm;
@@ -1033,11 +1032,15 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
uint32_t max_probes = 0;
grpc_transport_op op;
+ /* TODO(sreek): Delete this commented block once issue #5470 is resolved */
+ /*
+ size_t i;
for (i = 0; i < s->cq_count; i++) {
memset(&op, 0, sizeof(op));
op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
grpc_transport_perform_op(exec_ctx, transport, &op);
}
+ */
channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);