diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/error.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 28 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.cc | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.cc | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.h | 24 |
11 files changed, 66 insertions, 42 deletions
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 42cd7c455d..67c3caf5ee 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) { if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) { gpr_free(out); - out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string); + out = (char*)gpr_atm_acq_load(&err->atomics.error_string); } GPR_TIMER_END("grpc_error_string", 0); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 4759ee0791..8c72a439f6 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err); grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p); +/// This call takes ownership of the slice; the error is responsible for +/// eventually unref-ing it. grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which, grpc_slice str) GRPC_MUST_USE_RESULT; /// Returns false if the specified string is not set. @@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level -/// errors that contributed to them. +/// errors that contributed to them. The src error takes ownership of the +/// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; grpc_error* grpc_os_error(const char* file, int line, int err, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index ae9d47ece5..1ab7e516de 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index b2817156a8..5f5f45a7a5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { - gpr_log(GPR_ERROR, - "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 7a8962f4a8..8072a6cbed 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 53de94fb6e..7ea1dfaa80 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -71,6 +71,7 @@ struct grpc_fd { int shutdown; int closed; int released; + gpr_atm pollhup; grpc_error* shutdown_error; /* The watcher list. @@ -242,7 +243,7 @@ struct grpc_pollset_set { typedef struct poll_result { gpr_refcount refcount; - cv_node* watchers; + grpc_cv_node* watchers; int watchcount; struct pollfd* fds; nfds_t nfds; @@ -273,7 +274,7 @@ typedef struct poll_hash_table { } poll_hash_table; poll_hash_table poll_cache; -cv_fd_table g_cvfds; +grpc_cv_fd_table g_cvfds; /******************************************************************************* * fd_posix.c @@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) { r->on_done_closure = nullptr; r->closed = 0; r->released = 0; + gpr_atm_no_barrier_store(&r->pollhup, 0); r->read_notifier_pollset = nullptr; char* name2; @@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[0].events = POLLIN; pfds[0].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - if (fd_is_orphaned(pollset->fds[i])) { + if (fd_is_orphaned(pollset->fds[i]) || + gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } + /* This is a mitigation to prevent poll() from spinning on a + ** POLLHUP https://github.com/grpc/grpc/pull/13665 + */ + if (pfds[i].revents & POLLHUP) { + gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); + } fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } @@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) { } } -void remove_cvn(cv_node** head, cv_node* target) { +void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) { if (target->next) { target->next->prev = target->prev; } @@ -1460,7 +1469,7 @@ static void run_poll(void* args) { result->completed = 1; result->retval = retval; result->err = errno; - cv_node* watcher = result->watchers; + grpc_cv_node* watcher = result->watchers; while (watcher) { gpr_cv_signal(watcher->cv); watcher = watcher->next; @@ -1494,17 +1503,17 @@ static void run_poll(void* args) { static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - cv_node* pollcv; + grpc_cv_node* pollcv; int skip_poll = 0; nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); - pollcv = (cv_node*)gpr_malloc(sizeof(cv_node)); + pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node)); pollcv->next = nullptr; gpr_cv pollcv_cv; gpr_cv_init(&pollcv_cv); pollcv->cv = &pollcv_cv; - cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node)); + grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node)); for (i = 0; i < nfds; i++) { fds[i].revents = 0; @@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() { gpr_cv_init(&g_cvfds.shutdown_cv); gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.cvfds = + (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = nullptr; thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 24ccab14b2..8cd5f8d618 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -212,6 +212,9 @@ finish: fd = nullptr; } done = (--ac->refs == 0); + // Create a copy of the data from "ac" to be accessed after the unlock, as + // "ac" and its contents may be deallocated by the time they are read. + const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str); gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char* error_descr; @@ -225,9 +228,13 @@ finish: gpr_free(error_descr); gpr_free(desc); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(ac->addr_str)); + addr_str_slice /* takes ownership */); + } else { + grpc_slice_unref(addr_str_slice); } if (done) { + // This is safe even outside the lock, because "done", the sentinel, is + // populated *inside* the lock. gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); grpc_channel_args_destroy(ac->channel_args); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index 5c1f16d3fc..c785114212 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -34,7 +34,7 @@ #define MAX_TABLE_RESIZE 256 -extern cv_fd_table g_cvfds; +extern grpc_cv_fd_table g_cvfds; static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { unsigned int i, newsize; @@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); - g_cvfds.cvfds = - (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds, + sizeof(grpc_fd_node) * newsize); for (i = g_cvfds.size; i < newsize; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = nullptr; @@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { } static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { - cv_node* cvn; + grpc_cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 017e41bfa8..399620af76 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,27 +40,27 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) -typedef struct cv_node { +typedef struct grpc_cv_node { gpr_cv* cv; - struct cv_node* next; - struct cv_node* prev; -} cv_node; + struct grpc_cv_node* next; + struct grpc_cv_node* prev; +} grpc_cv_node; -typedef struct fd_node { +typedef struct grpc_fd_node { int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; + grpc_cv_node* cvs; + struct grpc_fd_node* next_free; +} grpc_fd_node; -typedef struct cv_fd_table { +typedef struct grpc_cv_fd_table { gpr_mu mu; gpr_refcount pollcount; gpr_cv shutdown_cv; - fd_node* cvfds; - fd_node* free_fds; + grpc_fd_node* cvfds; + grpc_fd_node* free_fds; unsigned int size; grpc_poll_function_type poll; -} cv_fd_table; +} grpc_cv_fd_table; extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; |