diff options
Diffstat (limited to 'src/core')
20 files changed, 246 insertions, 123 deletions
diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc index 29a6c0e367..9baccd8628 100644 --- a/src/core/ext/filters/client_channel/http_proxy.cc +++ b/src/core/ext/filters/client_channel/http_proxy.cc @@ -83,11 +83,24 @@ done: return proxy_name; } +/** + * Checks the value of GRPC_ARG_ENABLE_HTTP_PROXY to determine if http_proxy + * should be used. + */ +bool http_proxy_enabled(const grpc_channel_args* args) { + const grpc_arg* arg = + grpc_channel_args_find(args, GRPC_ARG_ENABLE_HTTP_PROXY); + return grpc_channel_arg_get_bool(arg, true); +} + static bool proxy_mapper_map_name(grpc_proxy_mapper* mapper, const char* server_uri, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { + if (!http_proxy_enabled(args)) { + return false; + } char* user_cred = nullptr; *name_to_resolve = get_http_proxy_server(&user_cred); if (*name_to_resolve == nullptr) return false; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index ebe2c4c41c..f496e9694d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -21,6 +21,7 @@ #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> +#include <string.h> #include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -55,8 +56,8 @@ typedef struct fd_node { bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; - /** if the fd is being shut down */ - bool shutting_down; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; } fd_node; struct grpc_ares_ev_driver { @@ -101,25 +102,20 @@ static void fd_node_destroy(fd_node* fdn) { gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); + GPR_ASSERT(fdn->already_shutdown); gpr_mu_destroy(&fdn->mu); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */, - "c-ares query finished"); + int dummy_release_fd; + grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); gpr_free(fdn); } -static void fd_node_shutdown(fd_node* fdn) { - gpr_mu_lock(&fdn->mu); - fdn->shutting_down = true; - if (!fdn->readable_registered && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - } else { - grpc_fd_shutdown( - fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); - gpr_mu_unlock(&fdn->mu); +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); } } @@ -127,7 +123,10 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set) { *ev_driver = static_cast<grpc_ares_ev_driver*>( gpr_malloc(sizeof(grpc_ares_ev_driver))); - int status = ares_init(&(*ev_driver)->channel); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); if (status != ARES_SUCCESS) { char* err_msg; @@ -164,8 +163,9 @@ void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { - grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "grpc_ares_ev_driver_shutdown")); + gpr_mu_lock(&fn->mu); + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); + gpr_mu_unlock(&fn->mu); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -202,14 +202,7 @@ static void on_readable_cb(void* arg, grpc_error* error) { gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; - if (fdn->shutting_down && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { do { @@ -236,14 +229,7 @@ static void on_writable_cb(void* arg, grpc_error* error) { gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; - if (fdn->shutting_down && !fdn->readable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); @@ -284,11 +270,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name, false); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; - fdn->shutting_down = false; + fdn->already_shutdown = false; gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_schedule_on_exec_ctx); @@ -329,7 +315,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { while (ev_driver->fds != nullptr) { fd_node* cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown(cur); + gpr_mu_lock(&cur->mu); + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + gpr_mu_unlock(&cur->mu); + fd_node_destroy(cur); + } else { + cur->next = new_list; + new_list = cur; + gpr_mu_unlock(&cur->mu); + } } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index d575d2d983..d23ad67ad5 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -293,11 +293,10 @@ static void client_start_transport_stream_op_batch( static void recv_initial_metadata_ready(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); server_call_data* calld = static_cast<server_call_data*>(elem->call_data); - // Get deadline from metadata and start the timer if needed. start_timer_if_needed(elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. - calld->next_recv_initial_metadata_ready->cb( - calld->next_recv_initial_metadata_ready->cb_arg, error); + GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Method for starting a call op for server filter. diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index 0d349e2a89..a8f70333b2 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -82,9 +82,7 @@ static void on_initial_md_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } - calld->ops_recv_initial_metadata_ready->cb( - calld->ops_recv_initial_metadata_ready->cb_arg, err); - GRPC_ERROR_UNREF(err); + GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err); } /* Constructor for call_data */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index b95c9dae53..dfed824cd5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client"), args, "fd-client"); + grpc_fd_create(fd, "client", false), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 371e463814..a0228785ee 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, char* name; gpr_asprintf(&name, "fd:%d", fd); - grpc_endpoint* server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name); + grpc_endpoint* server_endpoint = + grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 30f4e65632..827fd24831 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -116,7 +116,9 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); - CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFStreamClientContext ctx = {0, static_cast<void*>(this), + CFStreamHandle::Retain, CFStreamHandle::Release, + nil}; CFReadStreamSetClient( read_stream, kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 49850ab3a1..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index cf839619cd..86a0243d2e 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -136,6 +136,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; @@ -272,7 +273,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } - new_fd->fd = fd; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) { struct epoll_event ev; ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = new_fd; + /* Use the least significant bit of ev.data.ptr to store track_err. We expect + * the addresses to be word aligned. We need to store track_err to avoid + * synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) | + (track_err ? 1 : 0)); if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); } @@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why, shutdown(fd->fd, SHUT_RDWR); } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -337,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; bool is_release_fd = (release_fd != nullptr); @@ -350,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (is_release_fd) { *release_fd = fd->fd; - } else if (!already_closed) { + } else { close(fd->fd); } @@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, grpc_iomgr_unregister_object(&fd->iomgr_object); fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { fd->read_closure->SetReady(); /* Use release store to match with acquire load in fd_get_read_notifier */ @@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + /******************************************************************************* * Pollset Definitions */ @@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) { append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1); + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } - if (read_ev || cancel) { + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1183,6 +1208,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7903297fc6..111b62171b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -175,6 +175,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -184,6 +185,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; static void fd_global_init(void); @@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +353,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -395,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - bool is_fd_closed = already_closed; + const char* reason) { + bool is_fd_closed = false; gpr_mu_lock(&fd->orphan_mu); @@ -406,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { *release_fd = fd->fd; - } else if (!is_fd_closed) { + } else { close(fd->fd); is_fd_closed = true; } @@ -438,8 +446,14 @@ static bool fd_is_shutdown(grpc_fd* fd) { /* Might be called multiple times */ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { - shutdown(fd->fd, SHUT_RDWR); + if (shutdown(fd->fd, SHUT_RDWR)) { + if (errno != ENOTCONN) { + gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", + grpc_fd_wrapped_fd(fd), errno); + } + } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -452,6 +466,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -579,7 +597,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -780,6 +803,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -848,20 +873,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1503,6 +1536,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1510,6 +1544,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index a144817a83..2189801c18 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -132,6 +132,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -141,6 +142,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; /* Reference counting for fds */ @@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds, for (i = 0; i < fd_count; i++) { ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fds[i]; + /* Use the least significant bit of ev.data.ptr to store track_err to avoid + * synchronization issues when accessing it after receiving an event */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) | + (fds[i]->track_err ? 1 : 0)); err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev); if (err < 0) { @@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi, /* The caller is expected to hold pi->mu lock before calling this function */ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, - bool is_fd_closed, grpc_error** error) { int err; size_t i; @@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, /* If fd is already closed, then it would have been automatically been removed from the epoll set */ - if (!is_fd_closed) { - err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); - if (err < 0 && errno != ENOENT) { - gpr_asprintf( - &err_msg, - "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", - pi->epoll_fd, fd->fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } + err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); + if (err < 0 && errno != ENOENT) { + gpr_asprintf( + &err_msg, + "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", + pi->epoll_fd, fd->fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + gpr_free(err_msg); } for (i = 0; i < pi->fd_cnt; i++) { @@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -806,7 +811,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_mu_init(&new_fd->po.mu); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } /* Note: It is not really needed to get the new_fd->po.mu lock here. If this @@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->orphaned = false; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); + new_fd->track_err = track_err; gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; polling_island* unref_pi = nullptr; @@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, before doing this.) */ if (fd->po.pi != nullptr) { polling_island* pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; @@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollset Definitions */ @@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static void pollset_release_polling_island(grpc_pollset* ps, const char* reason) { if (ps->po.pi != nullptr) { @@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, to the function pollset_work_and_unlock() will pick up the correct epoll_fd */ } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1); + bool cancel = (ep_ev[i].events & EPOLLHUP) != 0; + bool error = (ep_ev[i].events & EPOLLERR) != 0; + bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1634,6 +1658,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 70958ed562..c9c09881a2 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) { } } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { + GPR_DEBUG_ASSERT(track_err == false); grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r))); gpr_mu_init(&r->mu); gpr_atm_rel_store(&r->refst, 1); @@ -424,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { fd->on_done_closure = on_done; fd->released = release_fd != nullptr; if (release_fd != nullptr) { *release_fd = fd->fd; fd->released = true; - } else if (already_closed) { - fd->released = true; } gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ @@ -553,6 +552,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { gpr_mu_unlock(&fd->mu); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); + abort(); +} + static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, grpc_pollset_worker* worker, uint32_t read_mask, uint32_t write_mask, grpc_fd_watcher* watcher) { @@ -1710,6 +1714,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + false, fd_create, fd_wrapped_fd, @@ -1717,6 +1722,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6b7eca0afa..1139b3273a 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) { g_event_engine = nullptr; } -grpc_fd* grpc_fd_create(int fd, const char* name) { - GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name); - GRPC_FD_TRACE("fd_create(%d, %s)", fd, name); - return g_event_engine->fd_create(fd, name); +bool grpc_event_engine_can_track_errors(void) { + return g_event_engine->can_track_err; +} + +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { + GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); + return g_event_engine->fd_create(fd, name, track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { @@ -204,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) { } void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)", - grpc_fd_wrapped_fd(fd), on_done, release_fd, - already_closed, reason); + const char* reason) { + GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd), + on_done, release_fd, reason); GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd)); - g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason); + g_event_engine->fd_orphan(fd, on_done, release_fd, reason); } void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) { @@ -231,6 +235,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { g_event_engine->fd_notify_on_write(fd, closure); } +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + g_event_engine->fd_notify_on_error(fd, closure); +} + static size_t pollset_size(void) { return g_event_engine->pollset_size; } static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 82cbce9a7b..b4c17fc80d 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { size_t pollset_size; + bool can_track_err; - grpc_fd* (*fd_create)(int fd, const char* name); + grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); void (*fd_shutdown)(grpc_fd* fd, grpc_error* why); void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); + void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure); bool (*fd_is_shutdown)(grpc_fd* fd); grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd); @@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void); /* Return the name of the poll strategy */ const char* grpc_get_poll_strategy_name(); +/* Returns true if polling engine can track errors separately, false otherwise. + * If this is true, fd can be created with track_err set. After this, error + * events will be reported using fd_notify_on_error. If it is not set, errors + * will continue to be reported through fd_notify_on_read and + * fd_notify_on_write. + */ +bool grpc_event_engine_can_track_errors(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. + \a track_err if true means that error events would be tracked separately + using grpc_fd_notify_on_error. Currently, valid only for linux systems. This takes ownership of closing fd. */ -grpc_fd* grpc_fd_create(int fd, const char* name); +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err); /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd* fd); @@ -100,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd); notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); /* Has grpc_fd_shutdown been called on an fd? */ bool grpc_fd_is_shutdown(grpc_fd* fd); @@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure); /* Exactly the same semantics as above, except based on writable events. */ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure); +/* Exactly the same semantics as above, except based on error events. track_err + * needs to have been set on grpc_fd_create */ +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure); + /* Return the read notifier pollset from the fd */ grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd); diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index ffed3bbef6..5acea91792 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -52,7 +52,7 @@ typedef struct CFStreamConnect { CFReadStreamRef read_stream; CFWriteStreamRef write_stream; - CFStreamHandle* stream_sync; + CFStreamHandle* stream_handle; grpc_timer alarm; grpc_closure on_alarm; @@ -71,7 +71,7 @@ typedef struct CFStreamConnect { static void CFStreamConnectCleanup(CFStreamConnect* connect) { grpc_resource_quota_unref_internal(connect->resource_quota); - CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up"); + CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up"); CFRelease(connect->read_stream); CFRelease(connect->write_stream); gpr_mu_destroy(&connect->mu); @@ -131,7 +131,7 @@ static void OnOpen(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { *endpoint = grpc_cfstream_endpoint_create( connect->read_stream, connect->write_stream, connect->addr_name, - connect->resource_quota, connect->stream_sync); + connect->resource_quota, connect->stream_handle); } } else { GRPC_ERROR_REF(error); @@ -170,8 +170,8 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, gpr_mu_init(&connect->mu); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", - connect->addr_name); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p, %s: asynchronously connecting", + connect, connect->addr_name); } grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); @@ -197,11 +197,11 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, CFRelease(host); connect->read_stream = read_stream; connect->write_stream = write_stream; - connect->stream_sync = + connect->stream_handle = CFStreamHandle::CreateStreamHandle(read_stream, write_stream); GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), grpc_schedule_on_exec_ctx); - connect->stream_sync->NotifyOnOpen(&connect->on_open); + connect->stream_handle->NotifyOnOpen(&connect->on_open); GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); gpr_mu_lock(&connect->mu); diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 39da7f1637..296ee74311 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) { finish: if (fd != nullptr) { grpc_pollset_set_del_fd(ac->interested_parties, fd); - grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, - "tcp_client_orphan"); + grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan"); fd = nullptr; } done = (--ac->refs == 0); @@ -280,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name); + *fdobj = grpc_fd_create(fd, name, false); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; @@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd( return; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */, - "tcp_client_connect_error"); + grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error"); GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect")); return; } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 43d545846d..9df2e206b2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { static void tcp_free(grpc_tcp* tcp) { grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, - false /* already_closed */, "tcp_unref_orphan"); + "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 0a5caca906..8ddf684fea 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) { GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, - false /* already_closed */, "tcp_listener_shutdown"); + "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name); + grpc_fd* fdobj = grpc_fd_create(fd, name, false); read_notifier_pollset = sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 73afa15e65..b9f8145572 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 51d17eb174..bdb2d0e764 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name); + emfd_ = grpc_fd_create(fd, name, false); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); @@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() { grpc_schedule_on_exec_ctx); /* Because at this point, all listening sockets have been shutdown already, no * need to call OnFdAboutToOrphan() to notify the handler again. */ - grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, - false /* already_closed */, "udp_listener_shutdown"); + grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown"); } void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { |