aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.cc13
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc63
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc5
-rw-r--r--src/core/ext/filters/load_reporting/server_load_reporting_filter.cc4
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc5
-rw-r--r--src/core/lib/iomgr/cfstream_handle.cc4
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc45
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc55
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc68
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc14
-rw-r--r--src/core/lib/iomgr/ev_posix.cc26
-rw-r--r--src/core/lib/iomgr/ev_posix.h24
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc14
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc8
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc6
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/udp_server.cc5
20 files changed, 246 insertions, 123 deletions
diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc
index 29a6c0e367..9baccd8628 100644
--- a/src/core/ext/filters/client_channel/http_proxy.cc
+++ b/src/core/ext/filters/client_channel/http_proxy.cc
@@ -83,11 +83,24 @@ done:
return proxy_name;
}
+/**
+ * Checks the value of GRPC_ARG_ENABLE_HTTP_PROXY to determine if http_proxy
+ * should be used.
+ */
+bool http_proxy_enabled(const grpc_channel_args* args) {
+ const grpc_arg* arg =
+ grpc_channel_args_find(args, GRPC_ARG_ENABLE_HTTP_PROXY);
+ return grpc_channel_arg_get_bool(arg, true);
+}
+
static bool proxy_mapper_map_name(grpc_proxy_mapper* mapper,
const char* server_uri,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
+ if (!http_proxy_enabled(args)) {
+ return false;
+ }
char* user_cred = nullptr;
*name_to_resolve = get_http_proxy_server(&user_cred);
if (*name_to_resolve == nullptr) return false;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index ebe2c4c41c..f496e9694d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -21,6 +21,7 @@
#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h>
+#include <string.h>
#include <sys/ioctl.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
@@ -55,8 +56,8 @@ typedef struct fd_node {
bool readable_registered;
/** if the writable closure has been registered */
bool writable_registered;
- /** if the fd is being shut down */
- bool shutting_down;
+ /** if the fd has been shutdown yet from grpc iomgr perspective */
+ bool already_shutdown;
} fd_node;
struct grpc_ares_ev_driver {
@@ -101,25 +102,20 @@ static void fd_node_destroy(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
+ GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
- grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */,
- "c-ares query finished");
+ int dummy_release_fd;
+ grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished");
gpr_free(fdn);
}
-static void fd_node_shutdown(fd_node* fdn) {
- gpr_mu_lock(&fdn->mu);
- fdn->shutting_down = true;
- if (!fdn->readable_registered && !fdn->writable_registered) {
- gpr_mu_unlock(&fdn->mu);
- fd_node_destroy(fdn);
- } else {
- grpc_fd_shutdown(
- fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown"));
- gpr_mu_unlock(&fdn->mu);
+static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
+ if (!fdn->already_shutdown) {
+ fdn->already_shutdown = true;
+ grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason));
}
}
@@ -127,7 +123,10 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
- int status = ares_init(&(*ev_driver)->channel);
+ ares_options opts;
+ memset(&opts, 0, sizeof(opts));
+ opts.flags |= ARES_FLAG_STAYOPEN;
+ int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
if (status != ARES_SUCCESS) {
char* err_msg;
@@ -164,8 +163,9 @@ void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
- grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "grpc_ares_ev_driver_shutdown"));
+ gpr_mu_lock(&fn->mu);
+ fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
+ gpr_mu_unlock(&fn->mu);
fn = fn->next;
}
gpr_mu_unlock(&ev_driver->mu);
@@ -202,14 +202,7 @@ static void on_readable_cb(void* arg, grpc_error* error) {
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false;
- if (fdn->shutting_down && !fdn->writable_registered) {
- gpr_mu_unlock(&fdn->mu);
- fd_node_destroy(fdn);
- grpc_ares_ev_driver_unref(ev_driver);
- return;
- }
gpr_mu_unlock(&fdn->mu);
-
gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) {
do {
@@ -236,14 +229,7 @@ static void on_writable_cb(void* arg, grpc_error* error) {
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false;
- if (fdn->shutting_down && !fdn->readable_registered) {
- gpr_mu_unlock(&fdn->mu);
- fd_node_destroy(fdn);
- grpc_ares_ev_driver_unref(ev_driver);
- return;
- }
gpr_mu_unlock(&fdn->mu);
-
gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@@ -284,11 +270,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
- fdn->fd = grpc_fd_create(socks[i], fd_name);
+ fdn->fd = grpc_fd_create(socks[i], fd_name, false);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
- fdn->shutting_down = false;
+ fdn->already_shutdown = false;
gpr_mu_init(&fdn->mu);
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn,
grpc_schedule_on_exec_ctx);
@@ -329,7 +315,16 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
- fd_node_shutdown(cur);
+ gpr_mu_lock(&cur->mu);
+ fd_node_shutdown_locked(cur, "c-ares fd shutdown");
+ if (!cur->readable_registered && !cur->writable_registered) {
+ gpr_mu_unlock(&cur->mu);
+ fd_node_destroy(cur);
+ } else {
+ cur->next = new_list;
+ new_list = cur;
+ gpr_mu_unlock(&cur->mu);
+ }
}
ev_driver->fds = new_list;
// If the ev driver has no working fd, all the tasks are done.
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index d575d2d983..d23ad67ad5 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -293,11 +293,10 @@ static void client_start_transport_stream_op_batch(
static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
- // Get deadline from metadata and start the timer if needed.
start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
- calld->next_recv_initial_metadata_ready->cb(
- calld->next_recv_initial_metadata_ready->cb_arg, error);
+ GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
}
// Method for starting a call op for server filter.
diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
index 0d349e2a89..a8f70333b2 100644
--- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
+++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
@@ -82,9 +82,7 @@ static void on_initial_md_ready(void* user_data, grpc_error* err) {
} else {
GRPC_ERROR_REF(err);
}
- calld->ops_recv_initial_metadata_ready->cb(
- calld->ops_recv_initial_metadata_ready->cb_arg, err);
- GRPC_ERROR_UNREF(err);
+ GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err);
}
/* Constructor for call_data */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index b95c9dae53..dfed824cd5 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client"), args, "fd-client");
+ grpc_fd_create(fd, "client", false), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index 371e463814..a0228785ee 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
char* name;
gpr_asprintf(&name, "fd:%d", fd);
- grpc_endpoint* server_endpoint = grpc_tcp_create(
- grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name);
+ grpc_endpoint* server_endpoint =
+ grpc_tcp_create(grpc_fd_create(fd, name, false),
+ grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
index 30f4e65632..827fd24831 100644
--- a/src/core/lib/iomgr/cfstream_handle.cc
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -116,7 +116,9 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
open_event_.InitEvent();
read_event_.InitEvent();
write_event_.InitEvent();
- CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
+ CFStreamClientContext ctx = {0, static_cast<void*>(this),
+ CFStreamHandle::Retain, CFStreamHandle::Release,
+ nil};
CFReadStreamSetClient(
read_stream,
kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 49850ab3a1..5c5c246f99 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index cf839619cd..86a0243d2e 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -136,6 +136,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
@@ -272,7 +273,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
-
new_fd->fd = fd;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) {
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = new_fd;
+ /* Use the least significant bit of ev.data.ptr to store track_err. We expect
+ * the addresses to be word aligned. We need to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
+ (track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
shutdown(fd->fd, SHUT_RDWR);
}
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -337,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
bool is_release_fd = (release_fd != nullptr);
@@ -350,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (is_release_fd) {
*release_fd = fd->fd;
- } else if (!already_closed) {
+ } else {
close(fd->fd);
}
@@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
fd->read_closure->SetReady();
/* Use release store to match with acquire load in fd_get_read_notifier */
@@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
- if (read_ev || cancel) {
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1183,6 +1208,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7903297fc6..111b62171b 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -175,6 +175,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -184,6 +185,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
static void fd_global_init(void);
@@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -348,7 +353,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
gpr_mu_init(&new_fd->pollable_mu);
@@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
+ new_fd->track_err = track_err;
new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -395,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
- bool is_fd_closed = already_closed;
+ const char* reason) {
+ bool is_fd_closed = false;
gpr_mu_lock(&fd->orphan_mu);
@@ -406,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != nullptr) {
*release_fd = fd->fd;
- } else if (!is_fd_closed) {
+ } else {
close(fd->fd);
is_fd_closed = true;
}
@@ -438,8 +446,14 @@ static bool fd_is_shutdown(grpc_fd* fd) {
/* Might be called multiple times */
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
+ if (shutdown(fd->fd, SHUT_RDWR)) {
+ if (errno != ENOTCONN) {
+ gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d",
+ grpc_fd_wrapped_fd(fd), errno);
+ }
+ }
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -452,6 +466,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollable Definitions
*/
@@ -579,7 +597,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
- ev_fd.data.ptr = fd;
+ /* Use the second least significant bit of ev_fd.data.ptr to store track_err
+ * to avoid synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) |
+ (fd->track_err ? 2 : 0));
GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
@@ -780,6 +803,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
grpc_error* error = GRPC_ERROR_NONE;
@@ -848,20 +873,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
(intptr_t)data_ptr)),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd =
+ reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2);
+ bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2;
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
}
- if (read_ev || cancel) {
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1503,6 +1536,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1510,6 +1544,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index a144817a83..2189801c18 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -132,6 +132,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -141,6 +142,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
/* Reference counting for fds */
@@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
for (i = 0; i < fd_count; i++) {
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fds[i];
+ /* Use the least significant bit of ev.data.ptr to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
+ (fds[i]->track_err ? 1 : 0));
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
if (err < 0) {
@@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
- bool is_fd_closed,
grpc_error** error) {
int err;
size_t i;
@@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
- if (!is_fd_closed) {
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
- pi->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
+ err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
+ if (err < 0 && errno != ENOENT) {
+ gpr_asprintf(
+ &err_msg,
+ "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
+ pi->epoll_fd, fd->fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ gpr_free(err_msg);
}
for (i = 0; i < pi->fd_cnt; i++) {
@@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -806,7 +811,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_mu_init(&new_fd->po.mu);
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->orphaned = false;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
+ new_fd->track_err = track_err;
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr;
@@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
before doing this.) */
if (fd->po.pi != nullptr) {
polling_island* pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
+ polling_island_remove_fd_locked(pi_latest, fd, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
+ bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
+ bool error = (ep_ev[i].events & EPOLLERR) != 0;
+ bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1634,6 +1658,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 70958ed562..c9c09881a2 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) {
}
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
+ GPR_DEBUG_ASSERT(track_err == false);
grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
gpr_mu_init(&r->mu);
gpr_atm_rel_store(&r->refst, 1);
@@ -424,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != nullptr;
if (release_fd != nullptr) {
*release_fd = fd->fd;
fd->released = true;
- } else if (already_closed) {
- fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
@@ -553,6 +552,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
gpr_mu_unlock(&fd->mu);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ abort();
+}
+
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
grpc_pollset_worker* worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher* watcher) {
@@ -1710,6 +1714,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ false,
fd_create,
fd_wrapped_fd,
@@ -1717,6 +1722,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6b7eca0afa..1139b3273a 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) {
g_event_engine = nullptr;
}
-grpc_fd* grpc_fd_create(int fd, const char* name) {
- GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name);
- GRPC_FD_TRACE("fd_create(%d, %s)", fd, name);
- return g_event_engine->fd_create(fd, name);
+bool grpc_event_engine_can_track_errors(void) {
+ return g_event_engine->can_track_err;
+}
+
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
+ GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
+ return g_event_engine->fd_create(fd, name, track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
@@ -204,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) {
}
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
- GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)",
- grpc_fd_wrapped_fd(fd), on_done, release_fd,
- already_closed, reason);
+ const char* reason) {
+ GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
+ on_done, release_fd, reason);
GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
- g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason);
+ g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
}
void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {
@@ -231,6 +235,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_write(fd, closure);
}
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ g_event_engine->fd_notify_on_error(fd, closure);
+}
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 82cbce9a7b..b4c17fc80d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable {
size_t pollset_size;
+ bool can_track_err;
- grpc_fd* (*fd_create)(int fd, const char* name);
+ grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason);
+ const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
bool (*fd_is_shutdown)(grpc_fd* fd);
grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
@@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void);
/* Return the name of the poll strategy */
const char* grpc_get_poll_strategy_name();
+/* Returns true if polling engine can track errors separately, false otherwise.
+ * If this is true, fd can be created with track_err set. After this, error
+ * events will be reported using fd_notify_on_error. If it is not set, errors
+ * will continue to be reported through fd_notify_on_read and
+ * fd_notify_on_write.
+ */
+bool grpc_event_engine_can_track_errors();
+
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
+ \a track_err if true means that error events would be tracked separately
+ using grpc_fd_notify_on_error. Currently, valid only for linux systems.
This takes ownership of closing fd. */
-grpc_fd* grpc_fd_create(int fd, const char* name);
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd* fd);
@@ -100,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason);
+ const char* reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd* fd);
@@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
+/* Exactly the same semantics as above, except based on error events. track_err
+ * needs to have been set on grpc_fd_create */
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
+
/* Return the read notifier pollset from the fd */
grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index ffed3bbef6..5acea91792 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -52,7 +52,7 @@ typedef struct CFStreamConnect {
CFReadStreamRef read_stream;
CFWriteStreamRef write_stream;
- CFStreamHandle* stream_sync;
+ CFStreamHandle* stream_handle;
grpc_timer alarm;
grpc_closure on_alarm;
@@ -71,7 +71,7 @@ typedef struct CFStreamConnect {
static void CFStreamConnectCleanup(CFStreamConnect* connect) {
grpc_resource_quota_unref_internal(connect->resource_quota);
- CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up");
+ CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up");
CFRelease(connect->read_stream);
CFRelease(connect->write_stream);
gpr_mu_destroy(&connect->mu);
@@ -131,7 +131,7 @@ static void OnOpen(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
*endpoint = grpc_cfstream_endpoint_create(
connect->read_stream, connect->write_stream, connect->addr_name,
- connect->resource_quota, connect->stream_sync);
+ connect->resource_quota, connect->stream_handle);
}
} else {
GRPC_ERROR_REF(error);
@@ -170,8 +170,8 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
gpr_mu_init(&connect->mu);
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
- connect->addr_name);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p, %s: asynchronously connecting",
+ connect, connect->addr_name);
}
grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
@@ -197,11 +197,11 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
CFRelease(host);
connect->read_stream = read_stream;
connect->write_stream = write_stream;
- connect->stream_sync =
+ connect->stream_handle =
CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
grpc_schedule_on_exec_ctx);
- connect->stream_sync->NotifyOnOpen(&connect->on_open);
+ connect->stream_handle->NotifyOnOpen(&connect->on_open);
GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
grpc_schedule_on_exec_ctx);
gpr_mu_lock(&connect->mu);
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 39da7f1637..296ee74311 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) {
finish:
if (fd != nullptr) {
grpc_pollset_set_del_fd(ac->interested_parties, fd);
- grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */,
- "tcp_client_orphan");
+ grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan");
fd = nullptr;
}
done = (--ac->refs == 0);
@@ -280,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name);
+ *fdobj = grpc_fd_create(fd, name, false);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
@@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd(
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */,
- "tcp_client_connect_error");
+ grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
return;
}
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 43d545846d..9df2e206b2 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
- false /* already_closed */, "tcp_unref_orphan");
+ "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 0a5caca906..8ddf684fea 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
- false /* already_closed */, "tcp_listener_shutdown");
+ "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, false);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 73afa15e65..b9f8145572 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 51d17eb174..bdb2d0e764 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name);
+ emfd_ = grpc_fd_create(fd, name, false);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
@@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() {
grpc_schedule_on_exec_ctx);
/* Because at this point, all listening sockets have been shutdown already, no
* need to call OnFdAboutToOrphan() to notify the handler again. */
- grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr,
- false /* already_closed */, "udp_listener_shutdown");
+ grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown");
}
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {