diff options
Diffstat (limited to 'src/core/ext')
31 files changed, 320 insertions, 396 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ea6775a8d8..f141aabe70 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -327,16 +327,14 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { if (chand->resolver_result != nullptr) { if (chand->resolver != nullptr) { // Find LB policy name. - const grpc_arg* channel_arg = grpc_channel_args_find( + const char* lb_policy_name = grpc_channel_args_get_string( chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); + grpc_lb_addresses* addresses = + grpc_channel_args_get_pointer<grpc_lb_addresses>( + chand->resolver_result, GRPC_ARG_LB_ADDRESSES); + if (addresses != nullptr) { bool found_balancer_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) { @@ -400,18 +398,15 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { // The copy will be saved in chand->lb_policy_name below. lb_policy_name_dup = gpr_strdup(lb_policy_name); // Find service config. - channel_arg = grpc_channel_args_find(chand->resolver_result, - GRPC_ARG_SERVICE_CONFIG); - service_config_json = - gpr_strdup(grpc_channel_arg_get_string(channel_arg)); + service_config_json = gpr_strdup(grpc_channel_args_get_string( + chand->resolver_result, GRPC_ARG_SERVICE_CONFIG)); if (service_config_json != nullptr) { grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = grpc_core::ServiceConfig::Create(service_config_json); if (service_config != nullptr) { if (chand->enable_retries) { - channel_arg = grpc_channel_args_find(chand->resolver_result, - GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(channel_arg); + const char* server_uri = grpc_channel_args_get_string( + chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); @@ -648,45 +643,37 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. - const grpc_arg* arg = grpc_channel_args_find( - args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); - chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( - arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); + chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_args_get_integer( + args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE, + {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); // Record enable_retries. - arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); - chand->enable_retries = grpc_channel_arg_get_bool(arg, true); + chand->enable_retries = grpc_channel_args_get_bool( + args->channel_args, GRPC_ARG_ENABLE_RETRIES, true); // Record client channel factory. - arg = grpc_channel_args_find(args->channel_args, - GRPC_ARG_CLIENT_CHANNEL_FACTORY); - if (arg == nullptr) { + grpc_client_channel_factory* client_channel_factory = + grpc_channel_args_get_pointer<grpc_client_channel_factory>( + args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); + if (client_channel_factory == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Missing client channel factory in args for client channel filter"); + "Missing or malformed client channel factory in args for client " + "channel filter"); } - if (arg->type != GRPC_ARG_POINTER) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "client channel factory arg must be a pointer"); - } - grpc_client_channel_factory_ref( - static_cast<grpc_client_channel_factory*>(arg->value.pointer.p)); - chand->client_channel_factory = - static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); + grpc_client_channel_factory_ref(client_channel_factory); + chand->client_channel_factory = client_channel_factory; // Get server name to resolve, using proxy mapper if needed. - arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); - if (arg == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Missing server uri in args for client channel filter"); - } - if (arg->type != GRPC_ARG_STRING) { + char* server_uri = + grpc_channel_args_get_string(args->channel_args, GRPC_ARG_SERVER_URI); + if (server_uri == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "server uri arg must be a string"); + "Missing or malformed server uri in args for client channel filter"); } char* proxy_name = nullptr; grpc_channel_args* new_args = nullptr; - grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, - &proxy_name, &new_args); + grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name, + &new_args); // Instantiate resolver. chand->resolver = grpc_core::ResolverRegistry::CreateResolver( - proxy_name != nullptr ? proxy_name : arg->value.string, + proxy_name != nullptr ? proxy_name : server_uri, new_args != nullptr ? new_args : args->channel_args, chand->interested_parties, chand->combiner); if (proxy_name != nullptr) gpr_free(proxy_name); diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 4e8b8b71db..58f059829c 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -254,9 +254,8 @@ static void http_connect_handshaker_do_handshake( reinterpret_cast<http_connect_handshaker*>(handshaker_in); // Check for HTTP CONNECT channel arg. // If not found, invoke on_handshake_done without doing anything. - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_SERVER); - char* server_name = grpc_channel_arg_get_string(arg); + char* server_name = + grpc_channel_args_get_string(args->args, GRPC_ARG_HTTP_CONNECT_SERVER); if (server_name == nullptr) { // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. @@ -267,8 +266,8 @@ static void http_connect_handshaker_do_handshake( return; } // Get headers from channel args. - arg = grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_HEADERS); - char* arg_header_string = grpc_channel_arg_get_string(arg); + char* arg_header_string = + grpc_channel_args_get_string(args->args, GRPC_ARG_HTTP_CONNECT_HEADERS); grpc_http_header* headers = nullptr; size_t num_headers = 0; char** header_strings = nullptr; diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc index 29a6c0e367..e21de35a7d 100644 --- a/src/core/ext/filters/client_channel/http_proxy.cc +++ b/src/core/ext/filters/client_channel/http_proxy.cc @@ -83,11 +83,22 @@ done: return proxy_name; } +/** + * Checks the value of GRPC_ARG_ENABLE_HTTP_PROXY to determine if http_proxy + * should be used. + */ +bool http_proxy_enabled(const grpc_channel_args* args) { + return grpc_channel_args_get_bool(args, GRPC_ARG_ENABLE_HTTP_PROXY, true); +} + static bool proxy_mapper_map_name(grpc_proxy_mapper* mapper, const char* server_uri, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { + if (!http_proxy_enabled(args)) { + return false; + } char* user_cred = nullptr; *name_to_resolve = get_http_proxy_server(&user_cred); if (*name_to_resolve == nullptr) return false; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 263b51ae89..0ee4958f3d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1045,8 +1045,8 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, grpc_combiner_scheduler(args.combiner)); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb"); // Record server name. - const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(arg); + const char* server_uri = + grpc_channel_args_get_string(args.args, GRPC_ARG_SERVER_URI); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); @@ -1058,12 +1058,12 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, } grpc_uri_destroy(uri); // Record LB call timeout. - arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + lb_call_timeout_ms_ = grpc_channel_args_get_integer( + args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX}); // Record fallback timeout. - arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( - arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); + lb_fallback_timeout_ms_ = grpc_channel_args_get_integer( + args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, + {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); // Process channel args. ProcessChannelArgsLocked(*args.args); } @@ -1284,8 +1284,10 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, } void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const grpc_lb_addresses* addresses = + grpc_channel_args_get_pointer<grpc_lb_addresses>(&args, + GRPC_ARG_LB_ADDRESSES); + if (GPR_UNLIKELY(addresses == nullptr)) { // Ignore this update. gpr_log( GPR_ERROR, @@ -1293,8 +1295,6 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. if (fallback_backend_addresses_ != nullptr) { grpc_lb_addresses_destroy(fallback_backend_addresses_); @@ -1860,13 +1860,12 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + grpc_lb_addresses* addresses = + grpc_channel_args_get_pointer<grpc_lb_addresses>(args.args, + GRPC_ARG_LB_ADDRESSES); + if (addresses == nullptr) { return nullptr; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; @@ -1893,10 +1892,9 @@ bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder, void* arg) { const grpc_channel_args* args = grpc_channel_stack_builder_get_channel_arguments(builder); - const grpc_arg* channel_arg = - grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && - strcmp(channel_arg->value.string, "grpclb") == 0) { + const char* lb_policy = + grpc_channel_args_get_string(args, GRPC_ARG_LB_POLICY_NAME); + if (lb_policy != nullptr && strcmp(lb_policy, "grpclb") == 0) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter*)arg, nullptr, nullptr); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 441efd5e23..972bdd40d5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -72,11 +72,10 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( grpc_arg args_to_add[2]; size_t num_args_to_add = 0; // Add arg for targets info table. - const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_channel_args_get_pointer<grpc_lb_addresses>(args, + GRPC_ARG_LB_ADDRESSES); + GPR_ASSERT(addresses != nullptr); grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); args_to_add[num_args_to_add++] = diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index ff2140e628..70220e28d3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -281,8 +281,10 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } void PickFirst::UpdateLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + const grpc_lb_addresses* addresses = + grpc_channel_args_get_pointer<const grpc_lb_addresses>( + &args, GRPC_ARG_LB_ADDRESSES); + if (addresses == nullptr) { if (subchannel_list_ == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( @@ -298,8 +300,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { } return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b177385065..066f6ef7a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -607,8 +607,10 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate, } void RoundRobin::UpdateLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + grpc_lb_addresses* addresses = + grpc_channel_args_get_pointer<grpc_lb_addresses>(&args, + GRPC_ARG_LB_ADDRESSES); + if (GPR_UNLIKELY(addresses == nullptr)) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). @@ -620,8 +622,6 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { } return; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc index 7c8cba55b7..93dae753bf 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc @@ -147,9 +147,6 @@ grpc_arg grpc_lb_addresses_create_channel_arg( grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( const grpc_channel_args* channel_args) { - const grpc_arg* lb_addresses_arg = - grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); - if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER) - return nullptr; - return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); + return grpc_channel_args_get_pointer<grpc_lb_addresses>( + channel_args, GRPC_ARG_LB_ADDRESSES); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 3c40ae14b8..01156b1e40 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -140,14 +140,11 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) dns_server_ = gpr_strdup(args.uri->authority); } channel_args_ = grpc_channel_args_copy(args.args); - const grpc_arg* arg = grpc_channel_args_find( - channel_args_, GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION); - request_service_config_ = !grpc_channel_arg_get_integer( - arg, (grpc_integer_options){false, false, true}); - arg = grpc_channel_args_find(channel_args_, - GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); - min_time_between_resolutions_ = - grpc_channel_arg_get_integer(arg, {1000, 0, INT_MAX}); + request_service_config_ = !grpc_channel_args_get_bool( + channel_args_, GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, false); + min_time_between_resolutions_ = grpc_channel_args_get_integer( + channel_args_, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS, + {1000, 0, INT_MAX}); interested_parties_ = grpc_pollset_set_create(); if (args.pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(interested_parties_, args.pollset_set); @@ -414,10 +411,10 @@ void AresDnsResolver::StartResolvingLocked() { resolving_ = true; lb_addresses_ = nullptr; service_config_json_ = nullptr; - pending_request_ = grpc_dns_lookup_ares( + pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, &on_resolved_, &lb_addresses_, true /* check_grpclb */, - request_service_config_ ? &service_config_json_ : nullptr); + request_service_config_ ? &service_config_json_ : nullptr, combiner()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 6239549534..27d1511d94 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -29,25 +29,27 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; /* Start \a ev_driver. It will keep working until all IO on its ares_channel is done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks bound to its ares_channel when necessary. */ -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver); /* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the query. */ -ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver); +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver); /* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is created successfully. */ -grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set); +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner); /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver will be cancelled and their on_done callbacks will be invoked with a status of ARES_ECANCELLED. */ -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver); /* Shutdown all the grpc_fds used by \a ev_driver */ -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver); +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index b604f2bf14..b73e979e9f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -18,9 +18,10 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> +#include <string.h> #include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -38,25 +39,23 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver* ev_driver; - /** a closure wrapping on_readable_cb, which should be invoked when the - grpc_fd in this node becomes readable. */ + /** a closure wrapping on_readable_locked, which should be + invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; - /** a closure wrapping on_writable_cb, which should be invoked when the - grpc_fd in this node becomes writable. */ + /** a closure wrapping on_writable_locked, which should be + invoked when the grpc_fd in this node becomes writable. */ grpc_closure write_closure; /** next fd node in the list */ struct fd_node* next; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** the grpc_fd owned by this fd node */ grpc_fd* fd; /** if the readable closure has been registered */ bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; - /** if the fd is being shut down */ - bool shutting_down; + /** if the fd has been shutdown yet from grpc iomgr perspective */ + bool already_shutdown; } fd_node; struct grpc_ares_ev_driver { @@ -67,8 +66,8 @@ struct grpc_ares_ev_driver { /** refcount of the event driver */ gpr_refcount refs; - /** mutex guarding the rest of the state */ - gpr_mu mu; + /** combiner to synchronize c-ares and I/O callbacks on */ + grpc_combiner* combiner; /** a list of grpc_fd that this event driver is currently using. */ fd_node* fds; /** is this event driver currently working? */ @@ -91,44 +90,42 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { if (gpr_unref(&ev_driver->refs)) { gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); GPR_ASSERT(ev_driver->fds == nullptr); - gpr_mu_destroy(&ev_driver->mu); + GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); ares_destroy(ev_driver->channel); gpr_free(ev_driver); } } -static void fd_node_destroy(fd_node* fdn) { +static void fd_node_destroy_locked(fd_node* fdn) { gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); - gpr_mu_destroy(&fdn->mu); - /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up + GPR_ASSERT(fdn->already_shutdown); + /* c-ares library will close the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, nullptr, true /* already_closed */, - "c-ares query finished"); + int dummy_release_fd; + grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); gpr_free(fdn); } -static void fd_node_shutdown(fd_node* fdn) { - gpr_mu_lock(&fdn->mu); - fdn->shutting_down = true; - if (!fdn->readable_registered && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - } else { - grpc_fd_shutdown( - fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("c-ares fd shutdown")); - gpr_mu_unlock(&fdn->mu); +static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { + if (!fdn->already_shutdown) { + fdn->already_shutdown = true; + grpc_fd_shutdown(fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason)); } } -grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, - grpc_pollset_set* pollset_set) { +grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, + grpc_pollset_set* pollset_set, + grpc_combiner* combiner) { *ev_driver = static_cast<grpc_ares_ev_driver*>( gpr_malloc(sizeof(grpc_ares_ev_driver))); - int status = ares_init(&(*ev_driver)->channel); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); + ares_options opts; + memset(&opts, 0, sizeof(opts)); + opts.flags |= ARES_FLAG_STAYOPEN; + int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); + gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); if (status != ARES_SUCCESS) { char* err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", @@ -138,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, gpr_free(*ev_driver); return err; } - gpr_mu_init(&(*ev_driver)->mu); + (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver"); gpr_ref_init(&(*ev_driver)->refs, 1); (*ev_driver)->pollset_set = pollset_set; (*ev_driver)->fds = nullptr; @@ -147,33 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, return GRPC_ERROR_NONE; } -void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { - // It's not safe to shut down remaining fds here directly, becauses - // ares_host_callback does not provide an exec_ctx. We mark the event driver - // as being shut down. If the event driver is working, - // grpc_ares_notify_on_event_locked will shut down the fds; if it's not - // working, there are no fds to shut down. - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) { + // We mark the event driver as being shut down. If the event driver + // is working, grpc_ares_notify_on_event_locked will shut down the + // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) { ev_driver->shutting_down = true; fd_node* fn = ev_driver->fds; while (fn != nullptr) { - grpc_fd_shutdown(fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "grpc_ares_ev_driver_shutdown")); + fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); fn = fn->next; } - gpr_mu_unlock(&ev_driver->mu); } // Search fd in the fd_node list head. This is an O(n) search, the max possible // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. -static fd_node* pop_fd_node(fd_node** head, int fd) { +static fd_node* pop_fd_node_locked(fd_node** head, int fd) { fd_node dummy_head; dummy_head.next = *head; fd_node* node = &dummy_head; @@ -190,31 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) { } /* Check if \a fd is still readable */ -static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, - int fd) { +static bool grpc_ares_is_fd_still_readable_locked( + grpc_ares_ev_driver* ev_driver, int fd) { size_t bytes_available = 0; return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; } -static void on_readable_cb(void* arg, grpc_error* error) { +static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; - if (fdn->shutting_down && !fdn->writable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } - gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { do { ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); - } while (grpc_ares_is_fd_still_readable(ev_driver, fd)); + } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -224,26 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -static void on_writable_cb(void* arg, grpc_error* error) { +static void on_writable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; - gpr_mu_lock(&fdn->mu); const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; - if (fdn->shutting_down && !fdn->readable_registered) { - gpr_mu_unlock(&fdn->mu); - fd_node_destroy(fdn); - grpc_ares_ev_driver_unref(ev_driver); - return; - } - gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); @@ -256,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) { // grpc_ares_notify_on_event_locked(). ares_cancel(ev_driver->channel); } - gpr_mu_lock(&ev_driver->mu); grpc_ares_notify_on_event_locked(ev_driver); - gpr_mu_unlock(&ev_driver->mu); grpc_ares_ev_driver_unref(ev_driver); } -ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) { +ares_channel* grpc_ares_ev_driver_get_channel_locked( + grpc_ares_ev_driver* ev_driver) { return &ev_driver->channel; } @@ -277,29 +246,27 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) || ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { - fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]); + fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]); // Create a new fd_node if sock[i] is not in the fd_node list. if (fdn == nullptr) { char* fd_name; gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name, false); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; - fdn->shutting_down = false; - gpr_mu_init(&fdn->mu); - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, - grpc_schedule_on_exec_ctx); + fdn->already_shutdown = false; + GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); + GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, + grpc_combiner_scheduler(ev_driver->combiner)); grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; new_list = fdn; - gpr_mu_lock(&fdn->mu); // Register read_closure if the socket is readable and read_closure has // not been registered with this socket. if (ARES_GETSOCK_READABLE(socks_bitmask, i) && @@ -319,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } - gpr_mu_unlock(&fdn->mu); } } } @@ -329,7 +295,13 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { while (ev_driver->fds != nullptr) { fd_node* cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_shutdown(cur); + fd_node_shutdown_locked(cur, "c-ares fd shutdown"); + if (!cur->readable_registered && !cur->writable_registered) { + fd_node_destroy_locked(cur); + } else { + cur->next = new_list; + new_list = cur; + } } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. @@ -339,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { } } -void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { - gpr_mu_lock(&ev_driver->mu); +void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); } - gpr_mu_unlock(&ev_driver->mu); } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */ +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 18d0a7b9f6..471de58e8c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -65,8 +65,6 @@ struct grpc_ares_request { /** number of ongoing queries */ gpr_refcount pending_queries; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** is there at least one successful query, set in on_done_cb */ bool success; /** the errors explaining the request failure, set in on_done_cb */ @@ -74,7 +72,8 @@ struct grpc_ares_request { }; typedef struct grpc_ares_hostbyname_request { - /** following members are set in create_hostbyname_request */ + /** following members are set in create_hostbyname_request_locked + */ /** the top-level request instance */ grpc_ares_request* parent_request; /** host to resolve, parsed from the name to resolve */ @@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) { return htons(static_cast<unsigned short>(atoi(port))); } -static void grpc_ares_request_ref(grpc_ares_request* r) { - gpr_ref(&r->pending_queries); -} - static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, const char* input_output_str) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) { @@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort( grpc_cares_wrapper_address_sorting_sort(lb_addrs); } -static void grpc_ares_request_unref(grpc_ares_request* r) { +static void grpc_ares_request_ref_locked(grpc_ares_request* r) { + gpr_ref(&r->pending_queries); +} + +static void grpc_ares_request_unref_locked(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { @@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) { grpc_cares_wrapper_address_sorting_sort(lb_addrs); } GRPC_CLOSURE_SCHED(r->on_done, r->error); - gpr_mu_destroy(&r->mu); - grpc_ares_ev_driver_destroy(r->ev_driver); + grpc_ares_ev_driver_destroy_locked(r->ev_driver); gpr_free(r); } } -static grpc_ares_hostbyname_request* create_hostbyname_request( +static grpc_ares_hostbyname_request* create_hostbyname_request_locked( grpc_ares_request* parent_request, char* host, uint16_t port, bool is_balancer) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>( @@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request( hr->host = gpr_strdup(host); hr->port = port; hr->is_balancer = is_balancer; - grpc_ares_request_ref(parent_request); + grpc_ares_request_ref_locked(parent_request); return hr; } -static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { - grpc_ares_request_unref(hr->parent_request); +static void destroy_hostbyname_request_locked( + grpc_ares_hostbyname_request* hr) { + grpc_ares_request_unref_locked(hr->parent_request); gpr_free(hr->host); gpr_free(hr); } -static void on_hostbyname_done_cb(void* arg, int status, int timeouts, - struct hostent* hostent) { +static void on_hostbyname_done_locked(void* arg, int status, int timeouts, + struct hostent* hostent) { grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(arg); grpc_ares_request* r = hr->parent_request; - gpr_mu_lock(&r->mu); if (status == ARES_SUCCESS) { GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; @@ -263,33 +261,33 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - gpr_mu_unlock(&r->mu); - destroy_hostbyname_request(hr); + destroy_hostbyname_request_locked(hr); } -static void on_srv_query_done_cb(void* arg, int status, int timeouts, - unsigned char* abuf, int alen) { +static void on_srv_query_done_locked(void* arg, int status, int timeouts, + unsigned char* abuf, int alen) { grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); - gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked"); if (status == ARES_SUCCESS) { - gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); + gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS"); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); if (parse_status == ARES_SUCCESS) { - ares_channel* channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + ares_channel* channel = + grpc_ares_ev_driver_get_channel_locked(r->ev_driver); for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; srv_it = srv_it->next) { if (grpc_ipv6_loopback_available()) { - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, - on_hostbyname_done_cb, hr); + on_hostbyname_done_locked, hr); } - grpc_ares_hostbyname_request* hr = create_hostbyname_request( + grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked( r, srv_it->host, htons(srv_it->port), true /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, - hr); - grpc_ares_ev_driver_start(r->ev_driver); + ares_gethostbyname(*channel, hr->host, AF_INET, + on_hostbyname_done_locked, hr); + grpc_ares_ev_driver_start_locked(r->ev_driver); } } if (reply != nullptr) { @@ -307,21 +305,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts, r->error = grpc_error_add_child(error, r->error); } } - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } static const char g_service_config_attribute_prefix[] = "grpc_config="; -static void on_txt_done_cb(void* arg, int status, int timeouts, - unsigned char* buf, int len) { - gpr_log(GPR_DEBUG, "on_txt_done_cb"); +static void on_txt_done_locked(void* arg, int status, int timeouts, + unsigned char* buf, int len) { + gpr_log(GPR_DEBUG, "on_txt_done_locked"); char* error_msg; grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; grpc_error* error = GRPC_ERROR_NONE; - gpr_mu_lock(&r->mu); if (status != ARES_SUCCESS) goto fail; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; @@ -366,14 +363,14 @@ fail: r->error = grpc_error_add_child(error, r->error); } done: - gpr_mu_unlock(&r->mu); - grpc_ares_request_unref(r); + grpc_ares_request_unref_locked(r); } -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_request* r = nullptr; @@ -402,20 +399,19 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } port = gpr_strdup(default_port); } - grpc_ares_ev_driver* ev_driver; - error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties, + combiner); if (error != GRPC_ERROR_NONE) goto error_cleanup; r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); - gpr_mu_init(&r->mu); r->ev_driver = ev_driver; r->on_done = on_done; r->lb_addrs_out = addrs; r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; - channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); // If dns_server is specified, use it. if (dns_server != nullptr) { @@ -457,32 +453,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl( } gpr_ref_init(&r->pending_queries, 1); if (grpc_ipv6_loopback_available()) { - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked, + hr); } - hr = create_hostbyname_request(r, host, strhtons(port), - false /* is_balancer */); - ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); + hr = create_hostbyname_request_locked(r, host, strhtons(port), + false /* is_balancer */); + ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked, + hr); if (check_grpclb) { /* Query the SRV record */ - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* service_name; gpr_asprintf(&service_name, "_grpclb._tcp.%s", host); - ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb, - r); + ares_query(*channel, service_name, ns_c_in, ns_t_srv, + on_srv_query_done_locked, r); gpr_free(service_name); } if (service_config_json != nullptr) { - grpc_ares_request_ref(r); + grpc_ares_request_ref_locked(r); char* config_name; gpr_asprintf(&config_name, "_grpc_config.%s", host); - ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r); + ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked, + r); gpr_free(config_name); } - /* TODO(zyc): Handle CNAME records here. */ - grpc_ares_ev_driver_start(r->ev_driver); - grpc_ares_request_unref(r); + grpc_ares_ev_driver_start_locked(r->ev_driver); + grpc_ares_request_unref_locked(r); gpr_free(host); gpr_free(port); return r; @@ -494,15 +492,15 @@ error_cleanup: return nullptr; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, - char** service_config_json) = grpc_dns_lookup_ares_impl; + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; void grpc_cancel_ares_request(grpc_ares_request* r) { - if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { - grpc_ares_ev_driver_shutdown(r->ev_driver); + if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) { + grpc_ares_ev_driver_shutdown_locked(r->ev_driver); } } @@ -534,6 +532,8 @@ void grpc_ares_cleanup(void) { */ typedef struct grpc_resolve_address_ares_request { + /* combiner that queries and related callbacks run under */ + grpc_combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving lb addresses */ @@ -541,8 +541,14 @@ typedef struct grpc_resolve_address_ares_request { /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the - grpc_dns_lookup_ares operation is done. */ + grpc_dns_lookup_ares_locked operation is done. */ grpc_closure on_dns_lookup_done; + /* target name */ + const char* name; + /* default port to use if none is specified */ + const char* default_port; + /* pollset_set to be driven by */ + grpc_pollset_set* interested_parties; } grpc_resolve_address_ares_request; static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { @@ -566,9 +572,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); + GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); gpr_free(r); } +static void grpc_resolve_address_invoke_dns_lookup_ares_locked( + void* arg, grpc_error* unused_error) { + grpc_resolve_address_ares_request* r = + static_cast<grpc_resolve_address_ares_request*>(arg); + grpc_dns_lookup_ares_locked( + nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, + &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */, + nullptr /* service_config_json */, r->combiner); +} + static void grpc_resolve_address_ares_impl(const char* name, const char* default_port, grpc_pollset_set* interested_parties, @@ -577,14 +594,18 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_resolve_address_ares_request* r = static_cast<grpc_resolve_address_ares_request*>( gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); + r->combiner = grpc_combiner_create(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, grpc_schedule_on_exec_ctx); - grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, - interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, - false /* check_grpclb */, - nullptr /* service_config_json */); + r->name = name; + r->default_port = default_port; + r->interested_parties = interested_parties; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r, + grpc_combiner_scheduler(r->combiner)), + GRPC_ERROR_NONE); } void (*grpc_resolve_address_ares)( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 2d84a038d6..9e93d0cf94 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -48,11 +48,11 @@ extern void (*grpc_resolve_address_ares)(const char* name, function. \a on_done may be called directly in this function without being scheduled with \a exec_ctx, so it must not try to acquire locks that are being held by the caller. */ -extern grpc_ares_request* (*grpc_dns_lookup_ares)( +extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_lb_addresses** addresses, bool check_grpclb, - char** service_config_json); + char** service_config_json, grpc_combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ void grpc_cancel_ares_request(grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index 5096e480bc..d6a76fc8b6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -26,18 +26,19 @@ struct grpc_ares_request { char val; }; -static grpc_ares_request* grpc_dns_lookup_ares_impl( +static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) { return NULL; } -grpc_ares_request* (*grpc_dns_lookup_ares)( +grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, - char** service_config_json) = grpc_dns_lookup_ares_impl; + grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; void grpc_cancel_ares_request(grpc_ares_request* r) {} diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index fae4c33a17..25e92a078d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -116,10 +116,9 @@ NativeDnsResolver::NativeDnsResolver(const ResolverArgs& args) if (path[0] == '/') ++path; name_to_resolve_ = gpr_strdup(path); channel_args_ = grpc_channel_args_copy(args.args); - const grpc_arg* arg = grpc_channel_args_find( - args.args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS); - min_time_between_resolutions_ = - grpc_channel_arg_get_integer(arg, {1000, 0, INT_MAX}); + min_time_between_resolutions_ = grpc_channel_args_get_integer( + args.args, GRPC_ARG_DNS_MIN_TIME_BETWEEN_RESOLUTIONS_MS, + {1000, 0, INT_MAX}); interested_parties_ = grpc_pollset_set_create(); if (args.pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(interested_parties_, args.pollset_set); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 99a33f2277..9250dbd6b3 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -252,20 +252,15 @@ static const grpc_arg_pointer_vtable response_generator_arg_vtable = { grpc_arg FakeResolverResponseGenerator::MakeChannelArg( FakeResolverResponseGenerator* generator) { - grpc_arg arg; - arg.type = GRPC_ARG_POINTER; - arg.key = (char*)GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; - arg.value.pointer.p = generator; - arg.value.pointer.vtable = &response_generator_arg_vtable; - return arg; + return grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR), generator, + &response_generator_arg_vtable); } FakeResolverResponseGenerator* FakeResolverResponseGenerator::GetFromArgs( const grpc_channel_args* args) { - const grpc_arg* arg = - grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return nullptr; - return static_cast<FakeResolverResponseGenerator*>(arg->value.pointer.p); + return grpc_channel_args_get_pointer<FakeResolverResponseGenerator>( + args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); } // diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index f010002ab9..5db5d1f6c0 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -736,9 +736,8 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { - const grpc_arg* addr_arg = - grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); - const char* addr_str = grpc_channel_arg_get_string(addr_arg); + const char* addr_str = + grpc_channel_args_get_string(args, GRPC_ARG_SUBCHANNEL_ADDRESS); GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. return addr_str; } diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 27d3eac8d6..caa547204b 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -289,11 +289,10 @@ static void client_start_transport_stream_op_batch( static void recv_initial_metadata_ready(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); server_call_data* calld = static_cast<server_call_data*>(elem->call_data); - // Get deadline from metadata and start the timer if needed. start_timer_if_needed(elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. - calld->next_recv_initial_metadata_ready->cb( - calld->next_recv_initial_metadata_ready->cb_arg, error); + GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Method for starting a call op for server filter. @@ -359,8 +358,8 @@ const grpc_channel_filter grpc_server_deadline_filter = { }; bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) { - return grpc_channel_arg_get_bool( - grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS), + return grpc_channel_args_get_bool( + channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS, !grpc_channel_args_want_minimal_stack(channel_args)); } diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index ae94ce47b9..fe7f2ba705 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -23,6 +23,7 @@ #include <stdint.h> #include <string.h> #include "src/core/ext/filters/http/client/http_client_filter.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" @@ -435,64 +436,43 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) {} -static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { - unsigned i; - size_t j; +static grpc_mdelem scheme_from_args(const grpc_channel_args* channel_args) { grpc_mdelem valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_SCHEME_HTTPS}; - if (args != nullptr) { - for (i = 0; i < args->num_args; ++i) { - if (args->args[i].type == GRPC_ARG_STRING && - strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { - for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) { - if (0 == grpc_slice_str_cmp(GRPC_MDVALUE(valid_schemes[j]), - args->args[i].value.string)) { - return valid_schemes[j]; - } - } + char* scheme = + grpc_channel_args_get_string(channel_args, GRPC_ARG_HTTP2_SCHEME); + if (scheme != nullptr) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(valid_schemes); i++) { + if (0 == grpc_slice_str_cmp(GRPC_MDVALUE(valid_schemes[i]), scheme)) { + return valid_schemes[i]; } } } return GRPC_MDELEM_SCHEME_HTTP; } -static size_t max_payload_size_from_args(const grpc_channel_args* args) { - if (args != nullptr) { - for (size_t i = 0; i < args->num_args; ++i) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) { - if (args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET); - } else { - return static_cast<size_t>(args->args[i].value.integer); - } - } - } - } - return kMaxPayloadSizeForGet; +static size_t max_payload_size_from_args( + const grpc_channel_args* channel_args) { + return grpc_channel_args_get_integer( + channel_args, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET, + {kMaxPayloadSizeForGet, 0, kMaxPayloadSizeForGet}); } static grpc_slice user_agent_from_args(const grpc_channel_args* args, const char* transport_name) { gpr_strvec v; - size_t i; int is_first = 1; char* tmp; grpc_slice result; gpr_strvec_init(&v); - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_PRIMARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } + char* user_agent_str = + grpc_channel_args_get_string(args, GRPC_ARG_PRIMARY_USER_AGENT_STRING); + if (user_agent_str != nullptr) { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(user_agent_str)); } gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s; %s)", is_first ? "" : " ", @@ -501,17 +481,11 @@ static grpc_slice user_agent_from_args(const grpc_channel_args* args, is_first = 0; gpr_strvec_add(&v, tmp); - for (i = 0; args && i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { - if (args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", - GRPC_ARG_SECONDARY_USER_AGENT_STRING); - } else { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); - is_first = 0; - gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); - } - } + user_agent_str = + grpc_channel_args_get_string(args, GRPC_ARG_SECONDARY_USER_AGENT_STRING); + if (user_agent_str != nullptr) { + gpr_strvec_add(&v, gpr_strdup(" ")); + gpr_strvec_add(&v, gpr_strdup(user_agent_str)); } tmp = gpr_strvec_flatten(&v, nullptr); diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index f03fa0141d..d3cbe50016 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -48,8 +48,8 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, optional_filter* filtarg = static_cast<optional_filter*>(arg); const grpc_channel_args* channel_args = grpc_channel_stack_builder_get_channel_arguments(builder); - bool enable = grpc_channel_arg_get_bool( - grpc_channel_args_find(channel_args, filtarg->control_channel_arg), + bool enable = grpc_channel_args_get_bool( + channel_args, filtarg->control_channel_arg, !grpc_channel_args_want_minimal_stack(channel_args)); return enable ? grpc_channel_stack_builder_prepend_filter( builder, filtarg->filter, nullptr, nullptr) diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index 0d349e2a89..a8f70333b2 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -82,9 +82,7 @@ static void on_initial_md_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } - calld->ops_recv_initial_metadata_ready->cb( - calld->ops_recv_initial_metadata_ready->cb_arg, err); - GRPC_ERROR_UNREF(err); + GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err); } /* Constructor for call_data */ diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc index 667c0c56ef..73bcf242d8 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc @@ -33,8 +33,7 @@ #include "src/core/lib/surface/channel_init.h" static bool is_load_reporting_enabled(const grpc_channel_args* a) { - return grpc_channel_arg_get_bool( - grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false); + return grpc_channel_args_get_bool(a, GRPC_ARG_ENABLE_LOAD_REPORTING, false); } static bool maybe_add_server_load_reporting_filter( diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 1fe8288bd0..bbe4998f00 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -519,13 +519,12 @@ static bool maybe_add_max_age_filter(grpc_channel_stack_builder* builder, void* arg) { const grpc_channel_args* channel_args = grpc_channel_stack_builder_get_channel_arguments(builder); - bool enable = - grpc_channel_arg_get_integer( - grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), - MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || - grpc_channel_arg_get_integer( - grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), - MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; + bool enable = grpc_channel_args_get_integer( + channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS, + MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || + grpc_channel_args_get_integer( + channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS, + MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; if (enable) { return grpc_channel_stack_builder_prepend_filter( builder, &grpc_max_age_filter, nullptr, nullptr); diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..9815c93145 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -254,9 +254,8 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, channel_data* chand = static_cast<channel_data*>(elem->channel_data); chand->limits = get_message_size_limits(args->channel_args); // Get method config table from channel args. - const grpc_arg* channel_arg = - grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); - const char* service_config_str = grpc_channel_arg_get_string(channel_arg); + const char* service_config_str = + grpc_channel_args_get_string(args->channel_args, GRPC_ARG_SERVICE_CONFIG); if (service_config_str != nullptr) { grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = grpc_core::ServiceConfig::Create(service_config_str); diff --git a/src/core/ext/transport/chttp2/client/authority.cc b/src/core/ext/transport/chttp2/client/authority.cc index bad3153b01..e3ef47c199 100644 --- a/src/core/ext/transport/chttp2/client/authority.cc +++ b/src/core/ext/transport/chttp2/client/authority.cc @@ -28,9 +28,8 @@ grpc_channel_args* grpc_default_authority_add_if_not_present( size_t num_new_args = 0; grpc_core::UniquePtr<char> default_authority; if (!has_default_authority) { - const grpc_arg* server_uri_arg = - grpc_channel_args_find(args, GRPC_ARG_SERVER_URI); - const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg); + const char* server_uri_str = + grpc_channel_args_get_string(args, GRPC_ARG_SERVER_URI); GPR_ASSERT(server_uri_str != nullptr); default_authority = grpc_core::ResolverRegistry::GetDefaultAuthority(server_uri_str); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index b95c9dae53..dfed824cd5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client"), args, "fd-client"); + grpc_fd_create(fd, "client", false), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 5ce73a95d7..f49bd27b6c 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -64,9 +64,8 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( return nullptr; } // To which address are we connecting? By default, use the server URI. - const grpc_arg* server_uri_arg = - grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg); + const char* server_uri_str = + grpc_channel_args_get_string(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(server_uri_str != nullptr); grpc_uri* server_uri = grpc_uri_parse(server_uri_str, true /* supress errors */); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 371e463814..a0228785ee 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, char* name; gpr_asprintf(&name, "fd:%d", fd); - grpc_endpoint* server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name); + grpc_endpoint* server_endpoint = + grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index cc4a823798..1a924e66e6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -447,20 +447,19 @@ static void init_transport(grpc_chttp2_transport* t, grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_OPTIMIZATION_TARGET)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s should be a string", - GRPC_ARG_OPTIMIZATION_TARGET); - } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + char* opt_target_str = + grpc_channel_arg_get_string(&channel_args->args[i]); + if (opt_target_str == nullptr) { + gpr_log(GPR_ERROR, "null/missing value opt target, assuming 'blend'"); + } else if (0 == strcmp(opt_target_str, "blend")) { t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + } else if (0 == strcmp(opt_target_str, "latency")) { t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == - strcmp(channel_args->args[i].value.string, "throughput")) { + } else if (0 == strcmp(opt_target_str, "throughput")) { t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; } else { gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", - GRPC_ARG_OPTIMIZATION_TARGET, - channel_args->args[i].value.string); + GRPC_ARG_OPTIMIZATION_TARGET, opt_target_str); } } else { static const struct { @@ -1451,10 +1450,8 @@ static void perform_stream_op_locked(void* stream_op, } } if (op_payload->send_initial_metadata.peer_string != nullptr) { - char* old_peer_string = (char*)gpr_atm_full_xchg( - op_payload->send_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - gpr_free(old_peer_string); + gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string, + (gpr_atm)t->peer_string); } } @@ -1569,10 +1566,8 @@ static void perform_stream_op_locked(void* stream_op, s->trailing_metadata_available = op_payload->recv_initial_metadata.trailing_metadata_available; if (op_payload->recv_initial_metadata.peer_string != nullptr) { - char* old_peer_string = (char*)gpr_atm_full_xchg( - op_payload->recv_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); - gpr_free(old_peer_string); + gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, + (gpr_atm)t->peer_string); } grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 420c2d13e1..5d614c1d05 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1450,20 +1450,8 @@ grpc_transport* grpc_create_cronet_transport(void* engine, const char* target, } strcpy(ct->host, target); - ct->use_packet_coalescing = true; - if (args) { - for (size_t i = 0; i < args->num_args; i++) { - if (0 == - strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { - if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) { - gpr_log(GPR_ERROR, "%s ignored: it must be an integer", - GRPC_ARG_USE_CRONET_PACKET_COALESCING); - } else { - ct->use_packet_coalescing = (args->args[i].value.integer != 0); - } - } - } - } + ct->use_packet_coalescing = grpc_channel_args_get_bool( + args, GRPC_ARG_USE_CRONET_PACKET_COALESCING, true); return &ct->base; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 2c3bff5c1e..6e5aa5a46b 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -1204,10 +1204,9 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server, // Add a default authority channel argument for the client - grpc_arg default_authority_arg; - default_authority_arg.type = GRPC_ARG_STRING; - default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY; - default_authority_arg.value.string = (char*)"inproc.authority"; + grpc_arg default_authority_arg = grpc_channel_arg_string_create( + const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), + const_cast<char*>("inproc.authority")); grpc_channel_args* client_args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); |