aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/error.cc2
-rw-r--r--src/core/lib/iomgr/error.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc28
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc9
-rw-r--r--src/core/lib/iomgr/udp_server.cc20
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.cc8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h24
11 files changed, 66 insertions, 42 deletions
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 42cd7c455d..67c3caf5ee 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) {
if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) {
gpr_free(out);
- out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string);
+ out = (char*)gpr_atm_acq_load(&err->atomics.error_string);
}
GPR_TIMER_END("grpc_error_string", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 4759ee0791..8c72a439f6 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err);
grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p);
+/// This call takes ownership of the slice; the error is responsible for
+/// eventually unref-ing it.
grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which,
grpc_slice str) GRPC_MUST_USE_RESULT;
/// Returns false if the specified string is not set.
@@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which,
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
-/// errors that contributed to them.
+/// errors that contributed to them. The src error takes ownership of the
+/// child error.
grpc_error* grpc_error_add_child(grpc_error* src,
grpc_error* child) GRPC_MUST_USE_RESULT;
grpc_error* grpc_os_error(const char* file, int line, int err,
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index ae9d47ece5..1ab7e516de 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index b2817156a8..5f5f45a7a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
- gpr_log(GPR_ERROR,
- "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 7a8962f4a8..8072a6cbed 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollsig_linux(
bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 53de94fb6e..7ea1dfaa80 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -71,6 +71,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
+ gpr_atm pollhup;
grpc_error* shutdown_error;
/* The watcher list.
@@ -242,7 +243,7 @@ struct grpc_pollset_set {
typedef struct poll_result {
gpr_refcount refcount;
- cv_node* watchers;
+ grpc_cv_node* watchers;
int watchcount;
struct pollfd* fds;
nfds_t nfds;
@@ -273,7 +274,7 @@ typedef struct poll_hash_table {
} poll_hash_table;
poll_hash_table poll_cache;
-cv_fd_table g_cvfds;
+grpc_cv_fd_table g_cvfds;
/*******************************************************************************
* fd_posix.c
@@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
+ gpr_atm_no_barrier_store(&r->pollhup, 0);
r->read_notifier_pollset = nullptr;
char* name2;
@@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[0].events = POLLIN;
pfds[0].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- if (fd_is_orphaned(pollset->fds[i])) {
+ if (fd_is_orphaned(pollset->fds[i]) ||
+ gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
}
+ /* This is a mitigation to prevent poll() from spinning on a
+ ** POLLHUP https://github.com/grpc/grpc/pull/13665
+ */
+ if (pfds[i].revents & POLLHUP) {
+ gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
+ }
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
@@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) {
}
}
-void remove_cvn(cv_node** head, cv_node* target) {
+void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) {
if (target->next) {
target->next->prev = target->prev;
}
@@ -1460,7 +1469,7 @@ static void run_poll(void* args) {
result->completed = 1;
result->retval = retval;
result->err = errno;
- cv_node* watcher = result->watchers;
+ grpc_cv_node* watcher = result->watchers;
while (watcher) {
gpr_cv_signal(watcher->cv);
watcher = watcher->next;
@@ -1494,17 +1503,17 @@ static void run_poll(void* args) {
static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- cv_node* pollcv;
+ grpc_cv_node* pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
poll_result* result = nullptr;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = (cv_node*)gpr_malloc(sizeof(cv_node));
+ pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node));
pollcv->next = nullptr;
gpr_cv pollcv_cv;
gpr_cv_init(&pollcv_cv);
pollcv->cv = &pollcv_cv;
- cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node));
+ grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node));
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
@@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() {
gpr_cv_init(&g_cvfds.shutdown_cv);
gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
- g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
+ g_cvfds.cvfds =
+ (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = nullptr;
thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 24ccab14b2..8cd5f8d618 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -212,6 +212,9 @@ finish:
fd = nullptr;
}
done = (--ac->refs == 0);
+ // Create a copy of the data from "ac" to be accessed after the unlock, as
+ // "ac" and its contents may be deallocated by the time they are read.
+ const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
char* error_descr;
@@ -225,9 +228,13 @@ finish:
gpr_free(error_descr);
gpr_free(desc);
error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(ac->addr_str));
+ addr_str_slice /* takes ownership */);
+ } else {
+ grpc_slice_unref(addr_str_slice);
}
if (done) {
+ // This is safe even outside the lock, because "done", the sentinel, is
+ // populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
grpc_channel_args_destroy(ac->channel_args);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 55e0b165ec..4a97f3353d 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -72,6 +72,7 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
@@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
@@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
@@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
@@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) {
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
@@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) {
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
@@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
+ write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
+ gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
sp = s->head;
while (sp != nullptr) {
+ sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 02e3acb7f5..a469ab9be5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,9 +30,12 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
+/* Called when grpc server starts to listening on the grpc_fd. */
+typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
+
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
-typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
@@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc
index 5c1f16d3fc..c785114212 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.cc
+++ b/src/core/lib/iomgr/wakeup_fd_cv.cc
@@ -34,7 +34,7 @@
#define MAX_TABLE_RESIZE 256
-extern cv_fd_table g_cvfds;
+extern grpc_cv_fd_table g_cvfds;
static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
unsigned int i, newsize;
@@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_cvfds.mu);
if (!g_cvfds.free_fds) {
newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
- g_cvfds.cvfds =
- (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
+ g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds,
+ sizeof(grpc_fd_node) * newsize);
for (i = g_cvfds.size; i < newsize; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = nullptr;
@@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
}
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
- cv_node* cvn;
+ grpc_cv_node* cvn;
gpr_mu_lock(&g_cvfds.mu);
g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1;
cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs;
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 017e41bfa8..399620af76 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -40,27 +40,27 @@
#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
-typedef struct cv_node {
+typedef struct grpc_cv_node {
gpr_cv* cv;
- struct cv_node* next;
- struct cv_node* prev;
-} cv_node;
+ struct grpc_cv_node* next;
+ struct grpc_cv_node* prev;
+} grpc_cv_node;
-typedef struct fd_node {
+typedef struct grpc_fd_node {
int is_set;
- cv_node* cvs;
- struct fd_node* next_free;
-} fd_node;
+ grpc_cv_node* cvs;
+ struct grpc_fd_node* next_free;
+} grpc_fd_node;
-typedef struct cv_fd_table {
+typedef struct grpc_cv_fd_table {
gpr_mu mu;
gpr_refcount pollcount;
gpr_cv shutdown_cv;
- fd_node* cvfds;
- fd_node* free_fds;
+ grpc_fd_node* cvfds;
+ grpc_fd_node* free_fds;
unsigned int size;
grpc_poll_function_type poll;
-} cv_fd_table;
+} grpc_cv_fd_table;
extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;