From 41e4cedb7012a55376322b142d74eae5e86b95e3 Mon Sep 17 00:00:00 2001 From: Vizerai Date: Fri, 13 Apr 2018 18:19:21 -0700 Subject: Adding opencensus grpc plugin. Rebasing to merge commits. --- config.w32 | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'config.w32') diff --git a/config.w32 b/config.w32 index 1a09accd8a..3e6d166b0e 100644 --- a/config.w32 +++ b/config.w32 @@ -355,7 +355,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.cc " + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.cc " + - "src\\core\\ext\\census\\grpc_context.cc " + + "src\\cpp\\ext\\filters\\census\\grpc_context.cc " + "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + @@ -652,7 +652,6 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\boringssl"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext"); - FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\census"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy"); @@ -728,6 +727,10 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\alts\\zero_copy_frame_protector"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl\\session_cache"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters\\census"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext\\grpc"); -- cgit v1.2.3 From 7eda61937e5b042882f70f9f8078c94edd7fc01c Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 14 Jun 2018 23:07:33 -0700 Subject: Separate the posix part of the c-ares driver --- BUILD | 1 + CMakeLists.txt | 2 + Makefile | 2 + build.yaml | 1 + config.m4 | 1 + config.w32 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + grpc.gyp | 2 + package.xml | 1 + .../resolver/dns/c_ares/grpc_ares_ev_driver.cc | 311 +++++++++++++++++++++ .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 36 +++ .../dns/c_ares/grpc_ares_ev_driver_posix.cc | 302 +++----------------- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 1 + 16 files changed, 401 insertions(+), 264 deletions(-) create mode 100644 src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc (limited to 'config.w32') diff --git a/BUILD b/BUILD index 3df4be8354..44b877705e 100644 --- a/BUILD +++ b/BUILD @@ -1333,6 +1333,7 @@ grpc_cc_library( name = "grpc_resolver_dns_ares", srcs = [ "src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc", + "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bcac91020..4dc8d98b87 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1213,6 +1213,7 @@ add_library(grpc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc + src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -2503,6 +2504,7 @@ add_library(grpc_unsecure src/core/ext/transport/inproc/inproc_plugin.cc src/core/ext/transport/inproc/inproc_transport.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc + src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc diff --git a/Makefile b/Makefile index 1b4fc6a49a..ab6045191d 100644 --- a/Makefile +++ b/Makefile @@ -3588,6 +3588,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ + src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \ @@ -4844,6 +4845,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_transport.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ + src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \ diff --git a/build.yaml b/build.yaml index ffe2febcc5..11060b6ddd 100644 --- a/build.yaml +++ b/build.yaml @@ -725,6 +725,7 @@ filegroups: - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h src: - src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc + - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc - src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc diff --git a/config.m4 b/config.m4 index 8190485249..a5b041f035 100644 --- a/config.m4 +++ b/config.m4 @@ -370,6 +370,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ + src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc \ diff --git a/config.w32 b/config.w32 index db7679ce53..cd51b4d97c 100644 --- a/config.w32 +++ b/config.w32 @@ -346,6 +346,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.cc " + + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_ev_driver_posix.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper.cc " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\grpc_ares_wrapper_fallback.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 399368bab9..ca6df16a06 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -785,6 +785,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', + 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 5ab3713683..1a3b5bd889 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -725,6 +725,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc ) + s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc ) diff --git a/grpc.gyp b/grpc.gyp index f893862062..20e463ed9e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -536,6 +536,7 @@ 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', + 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', @@ -1246,6 +1247,7 @@ 'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_transport.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', + 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', diff --git a/package.xml b/package.xml index 220256c11f..5be18bd4d0 100644 --- a/package.xml +++ b/package.xml @@ -730,6 +730,7 @@ + diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc new file mode 100644 index 0000000000..06a6e853f5 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -0,0 +1,311 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include + +#include "src/core/lib/iomgr/port.h" +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) + +#include +#include +#include + +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" + +#include +#include +#include +#include +#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +typedef struct fd_node { + /** the owner of this fd node */ + grpc_ares_ev_driver* ev_driver; + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ + grpc_closure read_closure; + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ + grpc_closure write_closure; + /** next fd node in the list */ + struct fd_node* next; + + /** wrapped fd that's polled by grpc's poller for the current platform */ + grpc_core::GrpcPolledFd* grpc_polled_fd; + /** if the readable closure has been registered */ + bool readable_registered; + /** if the writable closure has been registered */ + bool writable_registered; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; +} fd_node; + +struct grpc_ares_ev_driver { + /** the ares_channel owned by this event driver */ + ares_channel channel; + /** pollset set for driving the IO events of the channel */ + grpc_pollset_set* pollset_set; + /** refcount of the event driver */ + gpr_refcount refs; + + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; + /** a list of grpc_fd that this event driver is currently using. */ + fd_node* fds; + /** is this event driver currently working? */ + bool working; + /** is this event driver being shut down */ + bool shutting_down; +}; + +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); + +static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( + grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + gpr_ref(&ev_driver->refs); + return ev_driver; +} + +static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { + gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + if (gpr_unref(&ev_driver->refs)) { + gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GPR_ASSERT(ev_driver->fds == nullptr); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); + ares_destroy(ev_driver->channel); + gpr_free(ev_driver); + } +} + +static void fd_node_destroy_locked(fd_node* fdn) { + gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName()); + GPR_ASSERT(!fdn->readable_registered); + GPR_ASSERT(!fdn->writable_registered); + GPR_ASSERT(fdn->already_shutdown); + grpc_core::Delete(fdn->grpc_polled_fd); + gpr_free(fdn); +} + +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + fdn->grpc_polled_fd->ShutdownLocked( + GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + } +} + +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { + *ev_driver = static_cast( + gpr_malloc(sizeof(grpc_ares_ev_driver))); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + grpc_core::ConfigureAresChannelLocked(&(*ev_driver)->channel); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); + if (status != ARES_SUCCESS) { + char* err_msg; + gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", + ares_strerror(status)); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); + gpr_free(err_msg); + gpr_free(*ev_driver); + return err; + } + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); + gpr_ref_init(&(*ev_driver)->refs, 1); + (*ev_driver)->pollset_set = pollset_set; + (*ev_driver)->fds = nullptr; + (*ev_driver)->working = false; + (*ev_driver)->shutting_down = false; + return GRPC_ERROR_NONE; +} + +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. + ev_driver->shutting_down = true; + grpc_ares_ev_driver_unref(ev_driver); +} + +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { + ev_driver->shutting_down = true; + fd_node* fn = ev_driver->fds; + while (fn != nullptr) { + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); + fn = fn->next; + } +} + +// Search fd in the fd_node list head. This is an O(n) search, the max possible +// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. +static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { + fd_node dummy_head; + dummy_head.next = *head; + fd_node* node = &dummy_head; + while (node->next != nullptr) { + if (node->next->grpc_polled_fd->GetWrappedAresSocketLocked() == as) { + fd_node* ret = node->next; + node->next = node->next->next; + *head = dummy_head.next; + return ret; + } + node = node->next; + } + return nullptr; +} + +static void on_readable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->readable_registered = false; + gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + do { + ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); + } while (fdn->grpc_polled_fd->IsFdStillReadableLocked()); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +static void on_writable_locked(void* arg, grpc_error* error) { + fd_node* fdn = static_cast(arg); + grpc_ares_ev_driver* ev_driver = fdn->ev_driver; + const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); + fdn->writable_registered = false; + gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName()); + if (error == GRPC_ERROR_NONE) { + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); + } else { + // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or + // timed out. The pending lookups made on this ev_driver will be cancelled + // by the following ares_cancel() and the on_done callbacks will be invoked + // with a status of ARES_ECANCELLED. The remaining file descriptors in this + // ev_driver will be cleaned up in the follwing + // grpc_ares_notify_on_event_locked(). + ares_cancel(ev_driver->channel); + } + grpc_ares_notify_on_event_locked(ev_driver); + grpc_ares_ev_driver_unref(ev_driver); +} + +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver) { + return &ev_driver->channel; +} + +// Get the file descriptors used by the ev_driver's ares channel, register +// driver_closure with these filedescriptors. +static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { + fd_node* new_list = nullptr; + if (!ev_driver->shutting_down) { + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int socks_bitmask = + ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); + for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { + if (ARES_GETSOCK_READABLE(socks_bitmask, i) || + ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); + // Create a new fd_node if sock[i] is not in the fd_node list. + if (fdn == nullptr) { + fdn = static_cast(gpr_malloc(sizeof(fd_node))); + fdn->grpc_polled_fd = grpc_core::NewGrpcPolledFdLocked( + socks[i], ev_driver->pollset_set); + gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName()); + fdn->ev_driver = ev_driver; + fdn->readable_registered = false; + fdn->writable_registered = false; + fdn->already_shutdown = false; + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + } + fdn->next = new_list; + new_list = fdn; + // Register read_closure if the socket is readable and read_closure has + // not been registered with this socket. + if (ARES_GETSOCK_READABLE(socks_bitmask, i) && + !fdn->readable_registered) { + grpc_ares_ev_driver_ref(ev_driver); + gpr_log(GPR_DEBUG, "notify read on: %s", + fdn->grpc_polled_fd->GetName()); + fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); + fdn->readable_registered = true; + } + // Register write_closure if the socket is writable and write_closure + // has not been registered with this socket. + if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && + !fdn->writable_registered) { + gpr_log(GPR_DEBUG, "notify write on: %s", + fdn->grpc_polled_fd->GetName()); + grpc_ares_ev_driver_ref(ev_driver); + fdn->grpc_polled_fd->RegisterForOnWriteableLocked( + &fdn->write_closure); + fdn->writable_registered = true; + } + } + } + } + // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and + // are therefore no longer in use, so they can be shut down and removed from + // the list. + while (ev_driver->fds != nullptr) { + fd_node* cur = ev_driver->fds; + ev_driver->fds = ev_driver->fds->next; + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } + } + ev_driver->fds = new_list; + // If the ev driver has no working fd, all the tasks are done. + if (new_list == nullptr) { + ev_driver->working = false; + gpr_log(GPR_DEBUG, "ev driver stop working"); + } +} + +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { + if (!ev_driver->working) { + ev_driver->working = true; + grpc_ares_notify_on_event_locked(ev_driver); + } +} + +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 27d1511d94..7002c8f95f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -22,6 +22,7 @@ #include #include +#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/iomgr/pollset_set.h" typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; @@ -51,5 +52,40 @@ void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver); /* Shutdown all the grpc_fds used by \a ev_driver */ void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver); +namespace grpc_core { + +/* A wrapped fd that integrates with the grpc iomgr of the current platform. + * A GrpcPolledFd knows how to create grpc platform-specific iomgr endpoints + * from "ares_socket_t" sockets, and then sign up for readability/writeability + * with that poller, and do shutdown and destruction. */ +class GrpcPolledFd { + public: + virtual ~GrpcPolledFd() {} + /* Called when c-ares library is interested and there's no pending callback */ + virtual void RegisterForOnReadableLocked(grpc_closure* read_closure) + GRPC_ABSTRACT; + /* Called when c-ares library is interested and there's no pending callback */ + virtual void RegisterForOnWriteableLocked(grpc_closure* write_closure) + GRPC_ABSTRACT; + /* Indicates if there is data left even after just being read from */ + virtual bool IsFdStillReadableLocked() GRPC_ABSTRACT; + /* Called once and only once. Must cause cancellation of any pending + * read/write callbacks. */ + virtual void ShutdownLocked(grpc_error* error) GRPC_ABSTRACT; + /* Get the underlying ares_socket_t that this was created from */ + virtual ares_socket_t GetWrappedAresSocketLocked() GRPC_ABSTRACT; + /* A unique name, for logging */ + virtual const char* GetName() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +/* Creates a new wrapped fd for the current platform */ +GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, + grpc_pollset_set* driver_pollset_set); +void ConfigureAresChannelLocked(ares_channel* channel); + +} // namespace grpc_core + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index b73e979e9f..5db832baf8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -36,286 +36,60 @@ #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -typedef struct fd_node { - /** the owner of this fd node */ - grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_locked, which should be - invoked when the grpc_fd in this node becomes readable. */ - grpc_closure read_closure; - /** a closure wrapping on_writable_locked, which should be - invoked when the grpc_fd in this node becomes writable. */ - grpc_closure write_closure; - /** next fd node in the list */ - struct fd_node* next; - - /** the grpc_fd owned by this fd node */ - grpc_fd* fd; - /** if the readable closure has been registered */ - bool readable_registered; - /** if the writable closure has been registered */ - bool writable_registered; - /** if the fd has been shutdown yet from grpc iomgr perspective */ - bool already_shutdown; -} fd_node; - -struct grpc_ares_ev_driver { - /** the ares_channel owned by this event driver */ - ares_channel channel; - /** pollset set for driving the IO events of the channel */ - grpc_pollset_set* pollset_set; - /** refcount of the event driver */ - gpr_refcount refs; - - /** combiner to synchronize c-ares and I/O callbacks on */ - grpc_combiner* combiner; - /** a list of grpc_fd that this event driver is currently using. */ - fd_node* fds; - /** is this event driver currently working? */ - bool working; - /** is this event driver being shut down */ - bool shutting_down; -}; - -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); - -static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( - grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - gpr_ref(&ev_driver->refs); - return ev_driver; -} - -static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - if (gpr_unref(&ev_driver->refs)) { - gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); - GPR_ASSERT(ev_driver->fds == nullptr); - GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); - ares_destroy(ev_driver->channel); - gpr_free(ev_driver); +namespace grpc_core { + +class GrpcPolledFdPosix : public GrpcPolledFd { + public: + GrpcPolledFdPosix(ares_socket_t as, grpc_pollset_set* driver_pollset_set) + : as_(as) { + gpr_asprintf(&name_, "c-ares fd: %d", (int)as); + fd_ = grpc_fd_create((int)as, name_, false); + grpc_pollset_set_add_fd(driver_pollset_set, fd_); } -} - -static void fd_node_destroy_locked(fd_node* fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); - GPR_ASSERT(!fdn->readable_registered); - GPR_ASSERT(!fdn->writable_registered); - GPR_ASSERT(fdn->already_shutdown); - /* c-ares library will close the fd inside grpc_fd. This fd may be picked up - immediately by another thread, and should not be closed by the following - grpc_fd_orphan. */ - int dummy_release_fd; - grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); - gpr_free(fdn); -} -static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { - if (!fdn->already_shutdown) { - fdn->already_shutdown = true; - grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); + ~GrpcPolledFdPosix() { + gpr_free(name_); + /* c-ares library will close the fd inside grpc_fd. This fd may be picked up + immediately by another thread, and should not be closed by the following + grpc_fd_orphan. */ + int dummy_release_fd; + grpc_fd_orphan(fd_, nullptr, &dummy_release_fd, "c-ares query finished"); } -} -grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set, - grpc_combiner* combiner) { - *ev_driver = static_cast( - gpr_malloc(sizeof(grpc_ares_ev_driver))); - ares_options opts; - memset(&opts, 0, sizeof(opts)); - opts.flags |= ARES_FLAG_STAYOPEN; - int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); - if (status != ARES_SUCCESS) { - char* err_msg; - gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", - ares_strerror(status)); - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_msg); - gpr_free(err_msg); - gpr_free(*ev_driver); - return err; + void RegisterForOnReadableLocked(grpc_closure* read_closure) override { + grpc_fd_notify_on_read(fd_, read_closure); } - (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); - gpr_ref_init(&(*ev_driver)->refs, 1); - (*ev_driver)->pollset_set = pollset_set; - (*ev_driver)->fds = nullptr; - (*ev_driver)->working = false; - (*ev_driver)->shutting_down = false; - return GRPC_ERROR_NONE; -} -void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { - // We mark the event driver as being shut down. If the event driver - // is working, grpc_ares_notify_on_event_locked will shut down the - // fds; if it's not working, there are no fds to shut down. - ev_driver->shutting_down = true; - grpc_ares_ev_driver_unref(ev_driver); -} + void RegisterForOnWriteableLocked(grpc_closure* write_closure) override { + grpc_fd_notify_on_write(fd_, write_closure); + } -void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { - ev_driver->shutting_down = true; - fd_node* fn = ev_driver->fds; - while (fn != nullptr) { - fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); - fn = fn->next; + bool IsFdStillReadableLocked() override { + size_t bytes_available = 0; + return ioctl(grpc_fd_wrapped_fd(fd_), FIONREAD, &bytes_available) == 0 && + bytes_available > 0; } -} -// Search fd in the fd_node list head. This is an O(n) search, the max possible -// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node_locked(fd_node** head, int fd) { - fd_node dummy_head; - dummy_head.next = *head; - fd_node* node = &dummy_head; - while (node->next != nullptr) { - if (grpc_fd_wrapped_fd(node->next->fd) == fd) { - fd_node* ret = node->next; - node->next = node->next->next; - *head = dummy_head.next; - return ret; - } - node = node->next; + void ShutdownLocked(grpc_error* error) override { + grpc_fd_shutdown(fd_, error); } - return nullptr; -} -/* Check if \a fd is still readable */ -static bool grpc_ares_is_fd_still_readable_locked( - grpc_ares_ev_driver* ev_driver, int fd) { - size_t bytes_available = 0; - return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; -} + ares_socket_t GetWrappedAresSocketLocked() override { return as_; } -static void on_readable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const int fd = grpc_fd_wrapped_fd(fdn->fd); - fdn->readable_registered = false; - gpr_log(GPR_DEBUG, "readable on %d", fd); - if (error == GRPC_ERROR_NONE) { - do { - ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); - } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd)); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); - } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); -} + const char* GetName() override { return name_; } -static void on_writable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); - grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - const int fd = grpc_fd_wrapped_fd(fdn->fd); - fdn->writable_registered = false; - gpr_log(GPR_DEBUG, "writable on %d", fd); - if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); - } else { - // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or - // timed out. The pending lookups made on this ev_driver will be cancelled - // by the following ares_cancel() and the on_done callbacks will be invoked - // with a status of ARES_ECANCELLED. The remaining file descriptors in this - // ev_driver will be cleaned up in the follwing - // grpc_ares_notify_on_event_locked(). - ares_cancel(ev_driver->channel); - } - grpc_ares_notify_on_event_locked(ev_driver); - grpc_ares_ev_driver_unref(ev_driver); -} + char* name_; + ares_socket_t as_; + grpc_fd* fd_; +}; -ares_channel* grpc_ares_ev_driver_get_channel_locked( - grpc_ares_ev_driver* ev_driver) { - return &ev_driver->channel; +GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, + grpc_pollset_set* driver_pollset_set) { + return grpc_core::New(as, driver_pollset_set); } -// Get the file descriptors used by the ev_driver's ares channel, register -// driver_closure with these filedescriptors. -static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { - fd_node* new_list = nullptr; - if (!ev_driver->shutting_down) { - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int socks_bitmask = - ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM); - for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { - if (ARES_GETSOCK_READABLE(socks_bitmask, i) || - ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); - // Create a new fd_node if sock[i] is not in the fd_node list. - if (fdn == nullptr) { - char* fd_name; - gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); - fdn = static_cast(gpr_malloc(sizeof(fd_node))); - gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name, false); - fdn->ev_driver = ev_driver; - fdn->readable_registered = false; - fdn->writable_registered = false; - fdn->already_shutdown = false; - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, - grpc_combiner_scheduler(ev_driver->combiner)); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, - grpc_combiner_scheduler(ev_driver->combiner)); - grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); - gpr_free(fd_name); - } - fdn->next = new_list; - new_list = fdn; - // Register read_closure if the socket is readable and read_closure has - // not been registered with this socket. - if (ARES_GETSOCK_READABLE(socks_bitmask, i) && - !fdn->readable_registered) { - grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); - grpc_fd_notify_on_read(fdn->fd, &fdn->read_closure); - fdn->readable_registered = true; - } - // Register write_closure if the socket is writable and write_closure - // has not been registered with this socket. - if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && - !fdn->writable_registered) { - gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->fd)); - grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); - fdn->writable_registered = true; - } - } - } - } - // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and - // are therefore no longer in use, so they can be shut down and removed from - // the list. - while (ev_driver->fds != nullptr) { - fd_node* cur = ev_driver->fds; - ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown_locked(cur, "c-ares fd shutdown"); - if (!cur->readable_registered && !cur->writable_registered) { - fd_node_destroy_locked(cur); - } else { - cur->next = new_list; - new_list = cur; - } - } - ev_driver->fds = new_list; - // If the ev driver has no working fd, all the tasks are done. - if (new_list == nullptr) { - ev_driver->working = false; - gpr_log(GPR_DEBUG, "ev driver stop working"); - } -} +void ConfigureAresChannelLocked(ares_channel* channel) {} -void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { - if (!ev_driver->working) { - ev_driver->working = true; - grpc_ares_notify_on_event_locked(ev_driver); - } -} +} // namespace grpc_core #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5511bf9a32..d7d489c5a4 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -345,6 +345,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', + 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 589e862b9a..478c417ab3 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -913,6 +913,7 @@ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver.h \ src/core/ext/filters/client_channel/resolver/README.md \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc \ +src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 4b5f7d51ea..413efa5eac 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -10136,6 +10136,7 @@ "name": "grpc_resolver_dns_ares", "src": [ "src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc", + "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc", "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc", -- cgit v1.2.3