aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/handshaker.cc29
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc45
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc117
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc68
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc14
-rw-r--r--src/core/lib/iomgr/ev_posix.cc26
-rw-r--r--src/core/lib/iomgr/ev_posix.h24
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc8
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc6
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/udp_server.cc5
13 files changed, 255 insertions, 95 deletions
diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc
index 86f8699e04..ad3250b7e9 100644
--- a/src/core/lib/channel/handshaker.cc
+++ b/src/core/lib/channel/handshaker.cc
@@ -223,18 +223,23 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr,
mgr->index == mgr->count) {
if (error == GRPC_ERROR_NONE && mgr->shutdown) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown");
- // TODO(roth): It is currently necessary to shutdown endpoints
- // before destroying then, even when we know that there are no
- // pending read/write callbacks. This should be fixed, at which
- // point this can be removed.
- grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error));
- grpc_endpoint_destroy(mgr->args.endpoint);
- mgr->args.endpoint = nullptr;
- grpc_channel_args_destroy(mgr->args.args);
- mgr->args.args = nullptr;
- grpc_slice_buffer_destroy_internal(mgr->args.read_buffer);
- gpr_free(mgr->args.read_buffer);
- mgr->args.read_buffer = nullptr;
+ // It is possible that the endpoint has already been destroyed by
+ // a shutdown call while this callback was sitting on the ExecCtx
+ // with no error.
+ if (mgr->args.endpoint != nullptr) {
+ // TODO(roth): It is currently necessary to shutdown endpoints
+ // before destroying then, even when we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_destroy(mgr->args.endpoint);
+ mgr->args.endpoint = nullptr;
+ grpc_channel_args_destroy(mgr->args.args);
+ mgr->args.args = nullptr;
+ grpc_slice_buffer_destroy_internal(mgr->args.read_buffer);
+ gpr_free(mgr->args.read_buffer);
+ mgr->args.read_buffer = nullptr;
+ }
}
if (grpc_handshaker_trace.enabled()) {
gpr_log(GPR_INFO,
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 49850ab3a1..5c5c246f99 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index cf839619cd..86a0243d2e 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -136,6 +136,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
@@ -272,7 +273,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
-
new_fd->fd = fd;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) {
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = new_fd;
+ /* Use the least significant bit of ev.data.ptr to store track_err. We expect
+ * the addresses to be word aligned. We need to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
+ (track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
shutdown(fd->fd, SHUT_RDWR);
}
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -337,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != nullptr);
@@ -350,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (is_release_fd) {
*release_fd = fd->fd;
- } else if (!already_closed) {
+ } else {
close(fd->fd);
}
@@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
fd->read_closure->SetReady();
/* Use release store to match with acquire load in fd_get_read_notifier */
@@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
- if (read_ev || cancel) {
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1183,6 +1208,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7903297fc6..55a2b98372 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -63,7 +63,7 @@
// a keepalive ping timeout issue. We may want to revert https://github
// .com/grpc/grpc/pull/14943 once we figure out the root cause.
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
-#define MAX_PROBE_EPOLL_FDS 32
+#define MAX_FDS_IN_CACHE 32
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -77,8 +77,14 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable;
typedef struct cached_fd {
+ // Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
+ // details
intptr_t salt;
+
+ // The underlying fd
int fd;
+
+ // A recency time counter that helps to determine the LRU fd in the cache
uint64_t last_used;
} cached_fd;
@@ -111,10 +117,32 @@ struct pollable {
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
- // Maintain a LRU-eviction cache of fds in this pollable
- cached_fd fd_cache[MAX_PROBE_EPOLL_FDS];
+ // We may be calling pollable_add_fd() on the same (pollable, fd) multiple
+ // times. To prevent pollable_add_fd() from making multiple sys calls to
+ // epoll_ctl() to add the fd, we maintain a cache of what fds are already
+ // present in the underlying epoll-set.
+ //
+ // Since this is not a correctness issue, we do not need to maintain all the
+ // fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
+ //
+ // NOTE: An ideal implementation of this should do the following:
+ // 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
+ // is added to the pollable's epoll set)
+ // 2) Remove the fd from the cache whenever the fd is removed from the
+ // underlying epoll set (i.e whenever fd_orphan() is called).
+ //
+ // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
+ // lot of complexity since an fd can be present in multiple pollalbles. So our
+ // implementation ONLY DOES (1) and NOT (2).
+ //
+ // The cache_fd.salt variable helps here to maintain correctness (it serves as
+ // an epoch that differentiates one grpc_fd from the other even though both of
+ // them may have the same fd number)
+ //
+ // The following implements LRU-eviction cache of fds in this pollable
+ cached_fd fd_cache[MAX_FDS_IN_CACHE];
int fd_cache_size;
- uint64_t fd_cache_counter;
+ uint64_t fd_cache_counter; // Recency timer tick counter
};
static const char* pollable_type_string(pollable_type t) {
@@ -157,15 +185,24 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations
*/
+// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
+// the description of 'salt' variable in 'grpc_fd' for more details
+// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
+// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
static gpr_atm g_fd_salt;
struct grpc_fd {
int fd;
+
+ // Since fd numbers can be reused (after old fds are closed), this serves as
+ // an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
+ // unique (until the salt counter (i.e g_fd_salt) overflows)
intptr_t salt;
- /* refst format:
- bit 0 : 1=Active / 0=Orphaned
- bits 1-n : refcount
- Ref/Unref by two to avoid altering the orphaned bit */
+
+ // refst format:
+ // bit 0 : 1=Active / 0=Orphaned
+ // bits 1-n : refcount
+ // Ref/Unref by two to avoid altering the orphaned bit
gpr_atm refst;
gpr_mu orphan_mu;
@@ -175,15 +212,19 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
+ // The pollset that last noticed that the fd is readable. The actual type
+ // stored in this is (grpc_pollset *)
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ // Do we need to track EPOLLERR events separately?
+ bool track_err;
};
static void fd_global_init(void);
@@ -309,6 +350,7 @@ static void fd_destroy(void* arg, grpc_error* error) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -348,7 +390,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -362,6 +404,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
gpr_mu_init(&new_fd->pollable_mu);
@@ -369,9 +412,11 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
+ new_fd->track_err = track_err;
new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -395,8 +440,8 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
- bool is_fd_closed = already_closed;
+ const char* reason) {
+ bool is_fd_closed = false;
gpr_mu_lock(&fd->orphan_mu);
@@ -406,7 +451,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != nullptr) {
*release_fd = fd->fd;
- } else if (!is_fd_closed) {
+ } else {
close(fd->fd);
is_fd_closed = true;
}
@@ -438,8 +483,14 @@ static bool fd_is_shutdown(grpc_fd* fd) {
/* Might be called multiple times */
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
+ if (shutdown(fd->fd, SHUT_RDWR)) {
+ if (errno != ENOTCONN) {
+ gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d",
+ grpc_fd_wrapped_fd(fd), errno);
+ }
+ }
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -452,6 +503,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollable Definitions
*/
@@ -544,6 +599,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
const int epfd = p->epfd;
gpr_mu_lock(&p->mu);
p->fd_cache_counter++;
+
// Handle the case of overflow for our cache counter by
// reseting the recency-counter on all cache objects
if (p->fd_cache_counter == 0) {
@@ -563,8 +619,9 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
lru_idx = i;
}
}
+
// Add to cache
- if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) {
+ if (p->fd_cache_size < MAX_FDS_IN_CACHE) {
lru_idx = p->fd_cache_size;
p->fd_cache_size++;
}
@@ -572,6 +629,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
p->fd_cache[lru_idx].salt = fd->salt;
p->fd_cache[lru_idx].last_used = p->fd_cache_counter;
gpr_mu_unlock(&p->mu);
+
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
@@ -579,7 +637,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
- ev_fd.data.ptr = fd;
+ /* Use the second least significant bit of ev_fd.data.ptr to store track_err
+ * to avoid synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) |
+ (fd->track_err ? 2 : 0));
GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
@@ -780,6 +843,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
grpc_error* error = GRPC_ERROR_NONE;
@@ -848,20 +913,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
(intptr_t)data_ptr)),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd =
+ reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2);
+ bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2;
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
}
- if (read_ev || cancel) {
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1503,6 +1576,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1510,6 +1584,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index a144817a83..2189801c18 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -132,6 +132,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -141,6 +142,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
/* Reference counting for fds */
@@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
for (i = 0; i < fd_count; i++) {
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fds[i];
+ /* Use the least significant bit of ev.data.ptr to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
+ (fds[i]->track_err ? 1 : 0));
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
if (err < 0) {
@@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
- bool is_fd_closed,
grpc_error** error) {
int err;
size_t i;
@@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
- if (!is_fd_closed) {
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
- pi->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
+ err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
+ if (err < 0 && errno != ENOENT) {
+ gpr_asprintf(
+ &err_msg,
+ "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
+ pi->epoll_fd, fd->fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ gpr_free(err_msg);
}
for (i = 0; i < pi->fd_cnt; i++) {
@@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -806,7 +811,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_mu_init(&new_fd->po.mu);
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->orphaned = false;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
+ new_fd->track_err = track_err;
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr;
@@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
before doing this.) */
if (fd->po.pi != nullptr) {
polling_island* pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
+ polling_island_remove_fd_locked(pi_latest, fd, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
+ bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
+ bool error = (ep_ev[i].events & EPOLLERR) != 0;
+ bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1634,6 +1658,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 70958ed562..c9c09881a2 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) {
}
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
+ GPR_DEBUG_ASSERT(track_err == false);
grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
gpr_mu_init(&r->mu);
gpr_atm_rel_store(&r->refst, 1);
@@ -424,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != nullptr;
if (release_fd != nullptr) {
*release_fd = fd->fd;
fd->released = true;
- } else if (already_closed) {
- fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
@@ -553,6 +552,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
gpr_mu_unlock(&fd->mu);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ abort();
+}
+
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
grpc_pollset_worker* worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher* watcher) {
@@ -1710,6 +1714,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ false,
fd_create,
fd_wrapped_fd,
@@ -1717,6 +1722,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6b7eca0afa..1139b3273a 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) {
g_event_engine = nullptr;
}
-grpc_fd* grpc_fd_create(int fd, const char* name) {
- GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name);
- GRPC_FD_TRACE("fd_create(%d, %s)", fd, name);
- return g_event_engine->fd_create(fd, name);
+bool grpc_event_engine_can_track_errors(void) {
+ return g_event_engine->can_track_err;
+}
+
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
+ GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
+ return g_event_engine->fd_create(fd, name, track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
@@ -204,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) {
}
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
- GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)",
- grpc_fd_wrapped_fd(fd), on_done, release_fd,
- already_closed, reason);
+ const char* reason) {
+ GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
+ on_done, release_fd, reason);
GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
- g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason);
+ g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
}
void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {
@@ -231,6 +235,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_write(fd, closure);
}
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ g_event_engine->fd_notify_on_error(fd, closure);
+}
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 82cbce9a7b..b4c17fc80d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable {
size_t pollset_size;
+ bool can_track_err;
- grpc_fd* (*fd_create)(int fd, const char* name);
+ grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason);
+ const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
bool (*fd_is_shutdown)(grpc_fd* fd);
grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
@@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void);
/* Return the name of the poll strategy */
const char* grpc_get_poll_strategy_name();
+/* Returns true if polling engine can track errors separately, false otherwise.
+ * If this is true, fd can be created with track_err set. After this, error
+ * events will be reported using fd_notify_on_error. If it is not set, errors
+ * will continue to be reported through fd_notify_on_read and
+ * fd_notify_on_write.
+ */
+bool grpc_event_engine_can_track_errors();
+
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
+ \a track_err if true means that error events would be tracked separately
+ using grpc_fd_notify_on_error. Currently, valid only for linux systems.
This takes ownership of closing fd. */
-grpc_fd* grpc_fd_create(int fd, const char* name);
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd* fd);
@@ -100,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason);
+ const char* reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd* fd);
@@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
+/* Exactly the same semantics as above, except based on error events. track_err
+ * needs to have been set on grpc_fd_create */
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
+
/* Return the read notifier pollset from the fd */
grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 39da7f1637..296ee74311 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) {
finish:
if (fd != nullptr) {
grpc_pollset_set_del_fd(ac->interested_parties, fd);
- grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */,
- "tcp_client_orphan");
+ grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
fd = nullptr;
}
done = (--ac->refs == 0);
@@ -280,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name);
+ *fdobj = grpc_fd_create(fd, name, false);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
@@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd(
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */,
- "tcp_client_connect_error");
+ grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
return;
}
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 43d545846d..9df2e206b2 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
- false /* already_closed */, "tcp_unref_orphan");
+ "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 0a5caca906..8ddf684fea 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
- false /* already_closed */, "tcp_listener_shutdown");
+ "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, false);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 73afa15e65..b9f8145572 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 51d17eb174..bdb2d0e764 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name);
+ emfd_ = grpc_fd_create(fd, name, false);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
@@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() {
grpc_schedule_on_exec_ctx);
/* Because at this point, all listening sockets have been shutdown already, no
* need to call OnFdAboutToOrphan() to notify the handler again. */
- grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr,
- false /* already_closed */, "udp_listener_shutdown");
+ grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown");
}
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {