aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc')
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc93
1 files changed, 34 insertions, 59 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index f496e9694d..b73e979e9f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -39,17 +39,15 @@
typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver* ev_driver;
- /** a closure wrapping on_readable_cb, which should be invoked when the
- grpc_fd in this node becomes readable. */
+ /** a closure wrapping on_readable_locked, which should be
+ invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure;
- /** a closure wrapping on_writable_cb, which should be invoked when the
- grpc_fd in this node becomes writable. */
+ /** a closure wrapping on_writable_locked, which should be
+ invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure;
/** next fd node in the list */
struct fd_node* next;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** the grpc_fd owned by this fd node */
grpc_fd* fd;
/** if the readable closure has been registered */
@@ -68,8 +66,8 @@ struct grpc_ares_ev_driver {
/** refcount of the event driver */
gpr_refcount refs;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
+ /** combiner to synchronize c-ares and I/O callbacks on */
+ grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
@@ -92,19 +90,18 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr);
- gpr_mu_destroy(&ev_driver->mu);
+ GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
gpr_free(ev_driver);
}
}
-static void fd_node_destroy(fd_node* fdn) {
+static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown);
- gpr_mu_destroy(&fdn->mu);
- /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
+ /* c-ares library will close the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
int dummy_release_fd;
@@ -119,15 +116,16 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
}
}
-grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
- grpc_pollset_set* pollset_set) {
+grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
+ grpc_pollset_set* pollset_set,
+ grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts;
memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
- gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create");
+ gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) {
char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
@@ -137,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
gpr_free(*ev_driver);
return err;
}
- gpr_mu_init(&(*ev_driver)->mu);
+ (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
@@ -146,34 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
return GRPC_ERROR_NONE;
}
-void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) {
- // It's not safe to shut down remaining fds here directly, becauses
- // ares_host_callback does not provide an exec_ctx. We mark the event driver
- // as being shut down. If the event driver is working,
- // grpc_ares_notify_on_event_locked will shut down the fds; if it's not
- // working, there are no fds to shut down.
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
+ // We mark the event driver as being shut down. If the event driver
+ // is working, grpc_ares_notify_on_event_locked will shut down the
+ // fds; if it's not working, there are no fds to shut down.
ev_driver->shutting_down = true;
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) {
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds;
while (fn != nullptr) {
- gpr_mu_lock(&fn->mu);
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
- gpr_mu_unlock(&fn->mu);
fn = fn->next;
}
- gpr_mu_unlock(&ev_driver->mu);
}
// Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
-static fd_node* pop_fd_node(fd_node** head, int fd) {
+static fd_node* pop_fd_node_locked(fd_node** head, int fd) {
fd_node dummy_head;
dummy_head.next = *head;
fd_node* node = &dummy_head;
@@ -190,24 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) {
}
/* Check if \a fd is still readable */
-static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver,
- int fd) {
+static bool grpc_ares_is_fd_still_readable_locked(
+ grpc_ares_ev_driver* ev_driver, int fd) {
size_t bytes_available = 0;
return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0;
}
-static void on_readable_cb(void* arg, grpc_error* error) {
+static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
- gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false;
- gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) {
do {
ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD);
- } while (grpc_ares_is_fd_still_readable(ev_driver, fd));
+ } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd));
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
@@ -217,19 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
- gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver);
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-static void on_writable_cb(void* arg, grpc_error* error) {
+static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
- gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false;
- gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@@ -242,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
- gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver);
- gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver);
}
-ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) {
+ares_channel* grpc_ares_ev_driver_get_channel_locked(
+ grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel;
}
@@ -263,7 +246,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
- fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]);
+ fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) {
char* fd_name;
@@ -275,17 +258,15 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fdn->readable_registered = false;
fdn->writable_registered = false;
fdn->already_shutdown = false;
- gpr_mu_init(&fdn->mu);
- GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
+ grpc_combiner_scheduler(ev_driver->combiner));
+ GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
+ grpc_combiner_scheduler(ev_driver->combiner));
grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name);
}
fdn->next = new_list;
new_list = fdn;
- gpr_mu_lock(&fdn->mu);
// Register read_closure if the socket is readable and read_closure has
// not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
@@ -305,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure);
fdn->writable_registered = true;
}
- gpr_mu_unlock(&fdn->mu);
}
}
}
@@ -315,15 +295,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next;
- gpr_mu_lock(&cur->mu);
fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) {
- gpr_mu_unlock(&cur->mu);
- fd_node_destroy(cur);
+ fd_node_destroy_locked(cur);
} else {
cur->next = new_list;
new_list = cur;
- gpr_mu_unlock(&cur->mu);
}
}
ev_driver->fds = new_list;
@@ -334,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
}
}
-void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) {
- gpr_mu_lock(&ev_driver->mu);
+void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver);
}
- gpr_mu_unlock(&ev_driver->mu);
}
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */