aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollex_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc284
1 files changed, 220 insertions, 64 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 98369ddd6e..b082634af1 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -33,6 +33,7 @@
#include <poll.h>
#include <pthread.h>
#include <string.h>
+#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <unistd.h>
@@ -45,6 +46,7 @@
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
@@ -63,6 +65,7 @@
// a keepalive ping timeout issue. We may want to revert https://github
// .com/grpc/grpc/pull/14943 once we figure out the root cause.
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
+#define MAX_FDS_IN_CACHE 32
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -75,6 +78,18 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable;
+typedef struct cached_fd {
+ // Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
+ // details
+ intptr_t salt;
+
+ // The underlying fd
+ int fd;
+
+ // A recency time counter that helps to determine the LRU fd in the cache
+ uint64_t last_used;
+} cached_fd;
+
/// A pollable is something that can be polled: it has an epoll set to poll on,
/// and a wakeup fd for kicks
/// There are three broad types:
@@ -90,8 +105,10 @@ struct pollable {
int epfd;
grpc_wakeup_fd wakeup;
- // only for type fd... one ref to the owner fd
- grpc_fd* owner_fd;
+ // The following are relevant only for type PO_FD
+ grpc_fd* owner_fd; // Set to the owner_fd if the type is PO_FD
+ gpr_mu owner_orphan_mu; // Synchronizes access to owner_orphaned field
+ bool owner_orphaned; // Is the owner fd orphaned
grpc_pollset_set* pollset_set;
pollable* next;
@@ -103,6 +120,33 @@ struct pollable {
int event_cursor;
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ // We may be calling pollable_add_fd() on the same (pollable, fd) multiple
+ // times. To prevent pollable_add_fd() from making multiple sys calls to
+ // epoll_ctl() to add the fd, we maintain a cache of what fds are already
+ // present in the underlying epoll-set.
+ //
+ // Since this is not a correctness issue, we do not need to maintain all the
+ // fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
+ //
+ // NOTE: An ideal implementation of this should do the following:
+ // 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
+ // is added to the pollable's epoll set)
+ // 2) Remove the fd from the cache whenever the fd is removed from the
+ // underlying epoll set (i.e whenever fd_orphan() is called).
+ //
+ // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
+ // lot of complexity since an fd can be present in multiple pollables. So our
+ // implementation ONLY DOES (1) and NOT (2).
+ //
+ // The cache_fd.salt variable helps here to maintain correctness (it serves as
+ // an epoch that differentiates one grpc_fd from the other even though both of
+ // them may have the same fd number)
+ //
+ // The following implements LRU-eviction cache of fds in this pollable
+ cached_fd fd_cache[MAX_FDS_IN_CACHE];
+ int fd_cache_size;
+ uint64_t fd_cache_counter; // Recency timer tick counter
};
static const char* pollable_type_string(pollable_type t) {
@@ -145,12 +189,24 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations
*/
+// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
+// the description of 'salt' variable in 'grpc_fd' for more details
+// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
+// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
+static gpr_atm g_fd_salt;
+
struct grpc_fd {
int fd;
- /* refst format:
- bit 0 : 1=Active / 0=Orphaned
- bits 1-n : refcount
- Ref/Unref by two to avoid altering the orphaned bit */
+
+ // Since fd numbers can be reused (after old fds are closed), this serves as
+ // an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
+ // unique (until the salt counter (i.e g_fd_salt) overflows)
+ intptr_t salt;
+
+ // refst format:
+ // bit 0 : 1=Active / 0=Orphaned
+ // bits 1-n : refcount
+ // Ref/Unref by two to avoid altering the orphaned bit
gpr_atm refst;
gpr_mu orphan_mu;
@@ -160,15 +216,15 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
+
+ // Do we need to track EPOLLERR events separately?
+ bool track_err;
};
static void fd_global_init(void);
@@ -281,20 +337,44 @@ static void ref_by(grpc_fd* fd, int n) {
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
}
+#ifndef NDEBUG
+#define INVALIDATE_FD(fd) invalidate_fd(fd)
+/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is
+ * hard to cases where fd fields are accessed even after calling fd_destroy().
+ * The following invalidates fd fields to make catching such errors easier */
+static void invalidate_fd(grpc_fd* fd) {
+ fd->fd = -1;
+ fd->salt = -1;
+ gpr_atm_no_barrier_store(&fd->refst, -1);
+ memset(&fd->orphan_mu, -1, sizeof(fd->orphan_mu));
+ memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu));
+ fd->pollable_obj = nullptr;
+ fd->on_done_closure = nullptr;
+ memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object));
+ fd->track_err = false;
+}
+#else
+#define INVALIDATE_FD(fd)
+#endif
+
+/* Uninitialize and add to the freelist */
static void fd_destroy(void* arg, grpc_error* error) {
grpc_fd* fd = static_cast<grpc_fd*>(arg);
- /* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
gpr_mu_destroy(&fd->pollable_mu);
gpr_mu_destroy(&fd->orphan_mu);
- gpr_mu_lock(&fd_freelist_mu);
- fd->freelist_next = fd_freelist;
- fd_freelist = fd;
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
+ INVALIDATE_FD(fd);
+
+ /* Add the fd to the freelist */
+ gpr_mu_lock(&fd_freelist_mu);
+ fd->freelist_next = fd_freelist;
+ fd_freelist = fd;
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -333,7 +413,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -347,17 +427,18 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
- gpr_mu_init(&new_fd->pollable_mu);
+ new_fd->fd = fd;
+ new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
+ gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
gpr_mu_init(&new_fd->orphan_mu);
+ gpr_mu_init(&new_fd->pollable_mu);
new_fd->pollable_obj = nullptr;
- gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
- new_fd->fd = fd;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
-
+ new_fd->error_closure->InitEvent();
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
@@ -370,6 +451,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
}
#endif
gpr_free(fd_name);
+
+ new_fd->track_err = track_err;
return new_fd;
}
@@ -379,24 +462,36 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
- bool is_fd_closed = already_closed;
+ const char* reason) {
+ bool is_fd_closed = false;
gpr_mu_lock(&fd->orphan_mu);
+ // Get the fd->pollable_obj and set the owner_orphaned on that pollable to
+ // true so that the pollable will no longer access its owner_fd field.
+ gpr_mu_lock(&fd->pollable_mu);
+ pollable* pollable_obj = fd->pollable_obj;
+ gpr_mu_unlock(&fd->pollable_mu);
+
+ if (pollable_obj) {
+ gpr_mu_lock(&pollable_obj->owner_orphan_mu);
+ pollable_obj->owner_orphaned = true;
+ }
+
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != nullptr) {
*release_fd = fd->fd;
- } else if (!is_fd_closed) {
+ } else {
close(fd->fd);
is_fd_closed = true;
}
+ // TODO(sreek): handle fd removal (where is_fd_closed=false)
if (!is_fd_closed) {
- gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
+ GRPC_FD_TRACE("epoll_fd %p (%d) was orphaned but not closed.", fd, fd->fd);
}
/* Remove the active status but keep referenced. We want this grpc_fd struct
@@ -405,16 +500,15 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
+ if (pollable_obj) {
+ gpr_mu_unlock(&pollable_obj->owner_orphan_mu);
+ }
+
gpr_mu_unlock(&fd->orphan_mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -422,8 +516,14 @@ static bool fd_is_shutdown(grpc_fd* fd) {
/* Might be called multiple times */
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
+ if (shutdown(fd->fd, SHUT_RDWR)) {
+ if (errno != ENOTCONN) {
+ gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d",
+ grpc_fd_wrapped_fd(fd), errno);
+ }
+ }
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -436,6 +536,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollable Definitions
*/
@@ -479,11 +583,15 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
gpr_mu_init(&(*p)->mu);
(*p)->epfd = epfd;
(*p)->owner_fd = nullptr;
+ gpr_mu_init(&(*p)->owner_orphan_mu);
+ (*p)->owner_orphaned = false;
(*p)->pollset_set = nullptr;
(*p)->next = (*p)->prev = *p;
(*p)->root_worker = nullptr;
(*p)->event_cursor = 0;
(*p)->event_count = 0;
+ (*p)->fd_cache_size = 0;
+ (*p)->fd_cache_counter = 0;
return GRPC_ERROR_NONE;
}
@@ -516,6 +624,7 @@ static void pollable_unref(pollable* p, int line, const char* reason) {
GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd);
close(p->epfd);
grpc_wakeup_fd_destroy(&p->wakeup);
+ gpr_mu_destroy(&p->owner_orphan_mu);
gpr_free(p);
}
}
@@ -524,6 +633,38 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollable_add_fd";
const int epfd = p->epfd;
+ gpr_mu_lock(&p->mu);
+ p->fd_cache_counter++;
+
+ // Handle the case of overflow for our cache counter by
+ // reseting the recency-counter on all cache objects
+ if (p->fd_cache_counter == 0) {
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ p->fd_cache[i].last_used = 0;
+ }
+ }
+
+ int lru_idx = 0;
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) {
+ GRPC_STATS_INC_POLLSET_FD_CACHE_HITS();
+ p->fd_cache[i].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
+ return GRPC_ERROR_NONE;
+ } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) {
+ lru_idx = i;
+ }
+ }
+
+ // Add to cache
+ if (p->fd_cache_size < MAX_FDS_IN_CACHE) {
+ lru_idx = p->fd_cache_size;
+ p->fd_cache_size++;
+ }
+ p->fd_cache[lru_idx].fd = fd->fd;
+ p->fd_cache[lru_idx].salt = fd->salt;
+ p->fd_cache[lru_idx].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
@@ -532,7 +673,13 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
- ev_fd.data.ptr = fd;
+ /* Use the second least significant bit of ev_fd.data.ptr to store track_err
+ * to avoid synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) |
+ (fd->track_err ? 2 : 0));
+ GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
case EEXIST:
@@ -589,7 +736,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
- grpc_core::mu_guard lock(&p->mu);
+ grpc_core::MutexLock lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
@@ -718,24 +865,21 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
+/* Get the pollable_obj attached to this fd. If none is attached, create a new
+ * pollable object (of type PO_FD), attach it to the fd and return it
+ *
+ * Note that if a pollable object is already attached to the fd, it may be of
+ * either PO_FD or PO_MULTI type */
+static grpc_error* get_fd_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
grpc_error* error = GRPC_ERROR_NONE;
- static const char* err_desc = "fd_get_or_become_pollable";
+ static const char* err_desc = "get_fd_pollable";
if (fd->pollable_obj == nullptr) {
if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
err_desc)) {
@@ -800,20 +944,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
(intptr_t)data_ptr)),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd =
+ reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2);
+ bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2;
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
}
- if (read_ev || cancel) {
- fd_become_readable(fd, pollset);
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
+ fd_become_readable(fd);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1018,7 +1170,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
#endif
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
- "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR
+ "PS:%p work hdl=%p worker=%p now=%" PRId64 " deadline=%" PRId64
" kwp=%d pollable=%p",
pollset, worker_hdl, WORKER_PTR, grpc_core::ExecCtx::Get()->Now(),
deadline, pollset->kicked_without_poller, pollset->active_pollable);
@@ -1064,7 +1216,7 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
}
append_error(&error, pollset_kick_all(pollset), err_desc);
POLLABLE_UNREF(pollset->active_pollable, "pollset");
- append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable),
+ append_error(&error, get_fd_pollable(fd, &pollset->active_pollable),
err_desc);
return error;
}
@@ -1108,9 +1260,8 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) {
error = pollset_transition_pollable_from_empty_to_fd_locked(pollset, fd);
break;
case PO_FD:
- gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
- if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
- 1) == 0) {
+ gpr_mu_lock(&po_at_start->owner_orphan_mu);
+ if (po_at_start->owner_orphaned) {
error =
pollset_transition_pollable_from_empty_to_fd_locked(pollset, fd);
} else {
@@ -1118,7 +1269,7 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) {
error =
pollset_transition_pollable_from_fd_to_multi_locked(pollset, fd);
}
- gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
+ gpr_mu_unlock(&po_at_start->owner_orphan_mu);
break;
case PO_MULTI:
error = pollable_add_fd(pollset->active_pollable, fd);
@@ -1154,16 +1305,17 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
append_error(&error, pollset_kick_all(pollset), err_desc);
break;
case PO_FD:
- gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
- if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
- 1) == 0) {
+ gpr_mu_lock(&po_at_start->owner_orphan_mu);
+ if (po_at_start->owner_orphaned) {
+ // Unlock before Unref'ing the pollable
+ gpr_mu_unlock(&po_at_start->owner_orphan_mu);
POLLABLE_UNREF(pollset->active_pollable, "pollset");
error = pollable_create(PO_MULTI, &pollset->active_pollable);
} else {
error = pollset_transition_pollable_from_fd_to_multi_locked(pollset,
nullptr);
+ gpr_mu_unlock(&po_at_start->owner_orphan_mu);
}
- gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
break;
case PO_MULTI:
break;
@@ -1393,7 +1545,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
gpr_mu_unlock(b_mu);
}
// try to do the least copying possible
- // TODO(ctiller): there's probably a better heuristic here
+ // TODO(sreek): there's probably a better heuristic here
const size_t a_size = a->fd_count + a->pollset_count;
const size_t b_size = b->fd_count + b->pollset_count;
if (b_size > a_size) {
@@ -1455,6 +1607,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1462,8 +1615,11 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
@@ -1508,7 +1664,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX)
#include "src/core/lib/iomgr/ev_epollex_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
@@ -1516,6 +1672,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */
#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */