diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc | 74 |
1 files changed, 61 insertions, 13 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index fdbd07ebf5..d99c2e3004 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -31,8 +31,10 @@ #include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" typedef struct fd_node { /** the owner of this fd node */ @@ -76,21 +78,30 @@ struct grpc_ares_ev_driver { grpc_ares_request* request; /** Owned by the ev_driver. Creates new GrpcPolledFd's */ grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory; + /** query timeout in milliseconds */ + int query_timeout_ms; + /** alarm to cancel active queries */ + grpc_timer query_timeout; + /** cancels queries on a timeout */ + grpc_closure on_timeout_locked; }; static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, + ev_driver); gpr_ref(&ev_driver->refs); return ev_driver; } static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, + ev_driver); if (gpr_unref(&ev_driver->refs)) { - gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, + ev_driver); GPR_ASSERT(ev_driver->fds == nullptr); GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); ares_destroy(ev_driver->channel); @@ -100,7 +111,8 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { } static void fd_node_destroy_locked(fd_node* fdn) { - gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); @@ -116,8 +128,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } } +static void on_timeout_locked(void* arg, grpc_error* error); + grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, grpc_combiner* combiner, grpc_ares_request* request) { *ev_driver = grpc_core::New<grpc_ares_ev_driver>(); @@ -125,7 +140,7 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, memset(&opts, 0, sizeof(opts)); opts.flags |= ARES_FLAG_STAYOPEN; int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); + GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); if (status != ARES_SUCCESS) { char* err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", @@ -146,6 +161,9 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); + GRPC_CLOSURE_INIT(&(*ev_driver)->on_timeout_locked, on_timeout_locked, + *ev_driver, grpc_combiner_scheduler(combiner)); + (*ev_driver)->query_timeout_ms = query_timeout_ms; return GRPC_ERROR_NONE; } @@ -155,6 +173,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked( // is working, grpc_ares_notify_on_event_locked will shut down the // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; + grpc_timer_cancel(&ev_driver->query_timeout); grpc_ares_ev_driver_unref(ev_driver); } @@ -185,12 +204,25 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { return nullptr; } +static void on_timeout_locked(void* arg, grpc_error* error) { + grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg); + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " + "err=%s", + driver->request, driver, driver->shutting_down, grpc_error_string(error)); + if (!driver->shutting_down && error == GRPC_ERROR_NONE) { + grpc_ares_ev_driver_shutdown_locked(driver); + } + grpc_ares_ev_driver_unref(driver); +} + static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->readable_registered = false; - gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); if (error == GRPC_ERROR_NONE) { do { ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); @@ -213,7 +245,8 @@ static void on_writable_locked(void* arg, grpc_error* error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->writable_registered = false; - gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); } else { @@ -252,7 +285,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( socks[i], ev_driver->pollset_set, ev_driver->combiner); - gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; @@ -269,8 +303,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %s", - fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); fdn->readable_registered = true; } @@ -278,8 +313,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { // has not been registered with this socket. if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { - gpr_log(GPR_DEBUG, "notify write on: %s", - fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); grpc_ares_ev_driver_ref(ev_driver); fdn->grpc_polled_fd->RegisterForOnWriteableLocked( &fdn->write_closure); @@ -306,7 +342,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { // If the ev driver has no working fd, all the tasks are done. if (new_list == nullptr) { ev_driver->working = false; - gpr_log(GPR_DEBUG, "ev driver stop working"); + GRPC_CARES_TRACE_LOG("request:%p ev driver stop working", + ev_driver->request); } } @@ -314,6 +351,17 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); + grpc_millis timeout = + ev_driver->query_timeout_ms == 0 + ? GRPC_MILLIS_INF_FUTURE + : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " + "%" PRId64 " ms", + ev_driver->request, ev_driver, timeout); + grpc_ares_ev_driver_ref(ev_driver); + grpc_timer_init(&ev_driver->query_timeout, timeout, + &ev_driver->on_timeout_locked); } } |