From 8a880801aea7d82a792635c9d26271218efcbad3 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Thu, 15 Nov 2018 13:11:45 -0800 Subject: Add support for LB config in service config --- .../ext/filters/client_channel/client_channel.cc | 163 ++------- src/core/ext/filters/client_channel/lb_policy.h | 9 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 7 +- .../lb_policy/pick_first/pick_first.cc | 8 +- .../lb_policy/round_robin/round_robin.cc | 8 +- .../filters/client_channel/lb_policy/xds/xds.cc | 10 +- .../filters/client_channel/lb_policy_registry.cc | 5 + .../filters/client_channel/lb_policy_registry.h | 4 + .../ext/filters/client_channel/method_params.cc | 178 ---------- .../ext/filters/client_channel/method_params.h | 78 ----- .../resolver/dns/c_ares/dns_resolver_ares.cc | 23 +- .../client_channel/resolver_result_parsing.cc | 384 +++++++++++++++++++++ .../client_channel/resolver_result_parsing.h | 146 ++++++++ 13 files changed, 592 insertions(+), 431 deletions(-) delete mode 100644 src/core/ext/filters/client_channel/method_params.cc delete mode 100644 src/core/ext/filters/client_channel/method_params.h create mode 100644 src/core/ext/filters/client_channel/resolver_result_parsing.cc create mode 100644 src/core/ext/filters/client_channel/resolver_result_parsing.h (limited to 'src/core/ext') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8e9ee889e1..d94e20124e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -34,9 +34,9 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" -#include "src/core/ext/filters/client_channel/method_params.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" @@ -63,6 +63,8 @@ #include "src/core/lib/transport/status_metadata.h" using grpc_core::internal::ClientChannelMethodParams; +using grpc_core::internal::ClientChannelMethodParamsTable; +using grpc_core::internal::ProcessedResolverResult; using grpc_core::internal::ServerRetryThrottleData; /* Client channel implementation */ @@ -83,10 +85,6 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); struct external_connectivity_watcher; -typedef grpc_core::SliceHashTable< - grpc_core::RefCountedPtr> - MethodParamsTable; - typedef struct client_channel_channel_data { grpc_core::OrphanablePtr resolver; bool started_resolving; @@ -102,7 +100,7 @@ typedef struct client_channel_channel_data { /** retry throttle data */ grpc_core::RefCountedPtr retry_throttle_data; /** maps method names to method_parameters structs */ - grpc_core::RefCountedPtr method_params_table; + grpc_core::RefCountedPtr method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ @@ -251,66 +249,6 @@ static void start_resolving_locked(channel_data* chand) { &chand->on_resolver_result_changed); } -typedef struct { - char* server_name; - grpc_core::RefCountedPtr retry_throttle_data; -} service_config_parsing_state; - -static void parse_retry_throttle_params( - const grpc_json* field, service_config_parsing_state* parsing_state) { - if (strcmp(field->key, "retryThrottling") == 0) { - if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. - if (field->type != GRPC_JSON_OBJECT) return; - int max_milli_tokens = 0; - int milli_token_ratio = 0; - for (grpc_json* sub_field = field->child; sub_field != nullptr; - sub_field = sub_field->next) { - if (sub_field->key == nullptr) return; - if (strcmp(sub_field->key, "maxTokens") == 0) { - if (max_milli_tokens != 0) return; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return; - max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); - if (max_milli_tokens == -1) return; - max_milli_tokens *= 1000; - } else if (strcmp(sub_field->key, "tokenRatio") == 0) { - if (milli_token_ratio != 0) return; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return; - // We support up to 3 decimal digits. - size_t whole_len = strlen(sub_field->value); - uint32_t multiplier = 1; - uint32_t decimal_value = 0; - const char* decimal_point = strchr(sub_field->value, '.'); - if (decimal_point != nullptr) { - whole_len = static_cast(decimal_point - sub_field->value); - multiplier = 1000; - size_t decimal_len = strlen(decimal_point + 1); - if (decimal_len > 3) decimal_len = 3; - if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, - &decimal_value)) { - return; - } - uint32_t decimal_multiplier = 1; - for (size_t i = 0; i < (3 - decimal_len); ++i) { - decimal_multiplier *= 10; - } - decimal_value *= decimal_multiplier; - } - uint32_t whole_value; - if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, - &whole_value)) { - return; - } - milli_token_ratio = - static_cast((whole_value * multiplier) + decimal_value); - if (milli_token_ratio <= 0) return; - } - } - parsing_state->retry_throttle_data = - grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( - parsing_state->server_name, max_milli_tokens, milli_token_ratio); - } -} - // Invoked from the resolver NextLocked() callback when the resolver // is shutting down. static void on_resolver_shutdown_locked(channel_data* chand, @@ -352,37 +290,6 @@ static void on_resolver_shutdown_locked(channel_data* chand, GRPC_ERROR_UNREF(error); } -// Returns the LB policy name from the resolver result. -static grpc_core::UniquePtr -get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { - // Find LB policy name in channel args. - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); - // Special case: If at least one balancer address is present, we use - // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast(channel_arg->value.pointer.p); - if (grpc_lb_addresses_contains_balancer_address(*addresses)) { - if (lb_policy_name != nullptr && - gpr_stricmp(lb_policy_name, "grpclb") != 0) { - gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided at least one " - "balancer address -- forcing use of grpclb LB policy", - lb_policy_name); - } - lb_policy_name = "grpclb"; - } - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - return grpc_core::UniquePtr(gpr_strdup(lb_policy_name)); -} - static void request_reresolution_locked(void* arg, grpc_error* error) { reresolution_request_args* args = static_cast(arg); @@ -410,13 +317,14 @@ using TraceStringVector = grpc_core::InlinedVector; // *connectivity_error to its initial connectivity state; otherwise, // leaves them unchanged. static void create_new_lb_policy_locked( - channel_data* chand, char* lb_policy_name, + channel_data* chand, char* lb_policy_name, grpc_json* lb_config, grpc_connectivity_state* connectivity_state, grpc_error** connectivity_error, TraceStringVector* trace_strings) { grpc_core::LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = chand->combiner; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.args = chand->resolver_result; + lb_policy_args.lb_config = lb_config; grpc_core::OrphanablePtr new_lb_policy = grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, lb_policy_args); @@ -473,44 +381,6 @@ static void create_new_lb_policy_locked( } } -// Returns the service config (as a JSON string) from the resolver result. -// Also updates state in chand. -static grpc_core::UniquePtr -get_service_config_from_resolver_result_locked(channel_data* chand) { - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); - const char* service_config_json = grpc_channel_arg_get_string(channel_arg); - if (service_config_json != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", - chand, service_config_json); - } - grpc_core::UniquePtr service_config = - grpc_core::ServiceConfig::Create(service_config_json); - if (service_config != nullptr) { - if (chand->enable_retries) { - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(channel_arg); - GPR_ASSERT(server_uri != nullptr); - grpc_uri* uri = grpc_uri_parse(server_uri, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - service_config->ParseGlobalParams(parse_retry_throttle_params, - &parsing_state); - grpc_uri_destroy(uri); - chand->retry_throttle_data = - std::move(parsing_state.retry_throttle_data); - } - chand->method_params_table = service_config->CreateMethodConfigTable( - ClientChannelMethodParams::CreateFromJson); - } - } - return grpc_core::UniquePtr(gpr_strdup(service_config_json)); -} - static void maybe_add_trace_message_for_address_changes_locked( channel_data* chand, TraceStringVector* trace_strings) { int resolution_contains_addresses = false; @@ -598,8 +468,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); } } else { + // Parse the resolver result. + ProcessedResolverResult resolver_result(chand->resolver_result, + chand->enable_retries); + chand->retry_throttle_data = resolver_result.retry_throttle_data(); + chand->method_params_table = resolver_result.method_params_table(); + grpc_core::UniquePtr service_config_json = + resolver_result.service_config_json(); + if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json.get()); + } grpc_core::UniquePtr lb_policy_name = - get_lb_policy_name_from_resolver_result_locked(chand); + resolver_result.lb_policy_name(); + grpc_json* lb_policy_config = resolver_result.lb_policy_config(); // Check to see if we're already using the right LB policy. // Note: It's safe to use chand->info_lb_policy_name here without // taking a lock on chand->info_mu, because this function is the @@ -614,19 +496,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", chand, lb_policy_name.get(), chand->lb_policy.get()); } - chand->lb_policy->UpdateLocked(*chand->resolver_result); + chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config); // No need to set the channel's connectivity state; the existing // watch on the LB policy will take care of that. set_connectivity_state = false; } else { // Instantiate new LB policy. - create_new_lb_policy_locked(chand, lb_policy_name.get(), + create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config, &connectivity_state, &connectivity_error, &trace_strings); } - // Find service config. - grpc_core::UniquePtr service_config_json = - get_service_config_from_resolver_result_locked(chand); // Note: It's safe to use chand->info_service_config_json here without // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index b0040457a6..6733fdca81 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -58,6 +58,8 @@ class LoadBalancingPolicy /// Note that the LB policy gets the set of addresses from the /// GRPC_ARG_LB_ADDRESSES channel arg. grpc_channel_args* args = nullptr; + /// Load balancing config from the resolver. + grpc_json* lb_config = nullptr; }; /// State used for an LB pick. @@ -92,10 +94,11 @@ class LoadBalancingPolicy LoadBalancingPolicy(const LoadBalancingPolicy&) = delete; LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete; - /// Updates the policy with a new set of \a args from the resolver. - /// Note that the LB policy gets the set of addresses from the + /// Updates the policy with a new set of \a args and a new \a lb_config from + /// the resolver. Note that the LB policy gets the set of addresses from the /// GRPC_ARG_LB_ADDRESSES channel arg. - virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT; + virtual void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) GRPC_ABSTRACT; /// Finds an appropriate subchannel for a call, based on data in \a pick. /// \a pick must remain alive until the pick is complete. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index dbb90b438c..dc0e1f89ce 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -123,7 +123,8 @@ class GrpcLb : public LoadBalancingPolicy { public: GrpcLb(const grpc_lb_addresses* addresses, const Args& args); - void UpdateLocked(const grpc_channel_args& args) override; + void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -1331,7 +1332,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { grpc_channel_args_destroy(lb_channel_args); } -void GrpcLb::UpdateLocked(const grpc_channel_args& args) { +void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { ProcessChannelArgsLocked(args); // Update the existing RR policy. if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked(); @@ -1727,7 +1728,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, rr_policy_.get()); } - rr_policy_->UpdateLocked(*args); + rr_policy_->UpdateLocked(*args, nullptr); } else { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index eb494486b9..d454401a66 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -46,7 +46,8 @@ class PickFirst : public LoadBalancingPolicy { public: explicit PickFirst(const Args& args); - void UpdateLocked(const grpc_channel_args& args) override; + void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -159,7 +160,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p created.", this); } - UpdateLocked(*args.args); + UpdateLocked(*args.args, args.lb_config); grpc_subchannel_index_ref(); } @@ -333,7 +334,8 @@ void PickFirst::UpdateChildRefsLocked() { child_subchannels_ = std::move(cs); } -void PickFirst::UpdateLocked(const grpc_channel_args& args) { +void PickFirst::UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) { AutoChildRefsUpdater guard(this); const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e9ed85cf66..2a16975131 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -57,7 +57,8 @@ class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(const Args& args); - void UpdateLocked(const grpc_channel_args& args) override; + void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -232,7 +233,7 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { gpr_mu_init(&child_refs_mu_); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "round_robin"); - UpdateLocked(*args.args); + UpdateLocked(*args.args, args.lb_config); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this, subchannel_list_->num_subchannels()); @@ -664,7 +665,8 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } -void RoundRobin::UpdateLocked(const grpc_channel_args& args) { +void RoundRobin::UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); AutoChildRefsUpdater guard(this); if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 59d57295d4..29cd904375 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -118,7 +118,8 @@ class XdsLb : public LoadBalancingPolicy { public: XdsLb(const grpc_lb_addresses* addresses, const Args& args); - void UpdateLocked(const grpc_channel_args& args) override; + void UpdateLocked(const grpc_channel_args& args, + grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, @@ -1010,6 +1011,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // ctor and dtor // +// TODO(vishalpowar): Use lb_config in args to configure LB policy. XdsLb::XdsLb(const grpc_lb_addresses* addresses, const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), @@ -1314,7 +1316,8 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { grpc_channel_args_destroy(lb_channel_args); } -void XdsLb::UpdateLocked(const grpc_channel_args& args) { +// TODO(vishalpowar): Use lb_config to configure LB policy. +void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { ProcessChannelArgsLocked(args); // Update the existing child policy. // Note: We have disabled fallback mode in the code, so this child policy must @@ -1672,7 +1675,8 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this, child_policy_.get()); } - child_policy_->UpdateLocked(*args); + // TODO(vishalpowar): Pass the correct LB config. + child_policy_->UpdateLocked(*args, nullptr); } else { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc index d651b1120d..ad459c9c8c 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.cc +++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc @@ -94,4 +94,9 @@ LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( return factory->CreateLoadBalancingPolicy(args); } +bool LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(const char* name) { + GPR_ASSERT(g_state != nullptr); + return g_state->GetLoadBalancingPolicyFactory(name) != nullptr; +} + } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h index 2e9bb061ed..338f7c9f69 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.h +++ b/src/core/ext/filters/client_channel/lb_policy_registry.h @@ -47,6 +47,10 @@ class LoadBalancingPolicyRegistry { /// Creates an LB policy of the type specified by \a name. static OrphanablePtr CreateLoadBalancingPolicy( const char* name, const LoadBalancingPolicy::Args& args); + + /// Returns true if the LB policy factory specified by \a name exists in this + /// registry. + static bool LoadBalancingPolicyExists(const char* name); }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc deleted file mode 100644 index 1f116bb67d..0000000000 --- a/src/core/ext/filters/client_channel/method_params.cc +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include -#include - -#include -#include -#include - -#include "src/core/ext/filters/client_channel/method_params.h" -#include "src/core/lib/channel/status_util.h" -#include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/memory.h" - -// As per the retry design, we do not allow more than 5 retry attempts. -#define MAX_MAX_RETRY_ATTEMPTS 5 - -namespace grpc_core { -namespace internal { - -namespace { - -bool ParseWaitForReady( - grpc_json* field, ClientChannelMethodParams::WaitForReady* wait_for_ready) { - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return false; - } - *wait_for_ready = field->type == GRPC_JSON_TRUE - ? ClientChannelMethodParams::WAIT_FOR_READY_TRUE - : ClientChannelMethodParams::WAIT_FOR_READY_FALSE; - return true; -} - -// Parses a JSON field of the form generated for a google.proto.Duration -// proto message, as per: -// https://developers.google.com/protocol-buffers/docs/proto3#json -bool ParseDuration(grpc_json* field, grpc_millis* duration) { - if (field->type != GRPC_JSON_STRING) return false; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return false; - UniquePtr buf(gpr_strdup(field->value)); - *(buf.get() + len - 1) = '\0'; // Remove trailing 's'. - char* decimal_point = strchr(buf.get(), '.'); - int nanos = 0; - if (decimal_point != nullptr) { - *decimal_point = '\0'; - nanos = gpr_parse_nonnegative_int(decimal_point + 1); - if (nanos == -1) { - return false; - } - int num_digits = static_cast(strlen(decimal_point + 1)); - if (num_digits > 9) { // We don't accept greater precision than nanos. - return false; - } - for (int i = 0; i < (9 - num_digits); ++i) { - nanos *= 10; - } - } - int seconds = - decimal_point == buf.get() ? 0 : gpr_parse_nonnegative_int(buf.get()); - if (seconds == -1) return false; - *duration = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; - return true; -} - -UniquePtr ParseRetryPolicy( - grpc_json* field) { - auto retry_policy = MakeUnique(); - if (field->type != GRPC_JSON_OBJECT) return nullptr; - for (grpc_json* sub_field = field->child; sub_field != nullptr; - sub_field = sub_field->next) { - if (sub_field->key == nullptr) return nullptr; - if (strcmp(sub_field->key, "maxAttempts") == 0) { - if (retry_policy->max_attempts != 0) return nullptr; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; - retry_policy->max_attempts = gpr_parse_nonnegative_int(sub_field->value); - if (retry_policy->max_attempts <= 1) return nullptr; - if (retry_policy->max_attempts > MAX_MAX_RETRY_ATTEMPTS) { - gpr_log(GPR_ERROR, - "service config: clamped retryPolicy.maxAttempts at %d", - MAX_MAX_RETRY_ATTEMPTS); - retry_policy->max_attempts = MAX_MAX_RETRY_ATTEMPTS; - } - } else if (strcmp(sub_field->key, "initialBackoff") == 0) { - if (retry_policy->initial_backoff > 0) return nullptr; // Duplicate. - if (!ParseDuration(sub_field, &retry_policy->initial_backoff)) { - return nullptr; - } - if (retry_policy->initial_backoff == 0) return nullptr; - } else if (strcmp(sub_field->key, "maxBackoff") == 0) { - if (retry_policy->max_backoff > 0) return nullptr; // Duplicate. - if (!ParseDuration(sub_field, &retry_policy->max_backoff)) { - return nullptr; - } - if (retry_policy->max_backoff == 0) return nullptr; - } else if (strcmp(sub_field->key, "backoffMultiplier") == 0) { - if (retry_policy->backoff_multiplier != 0) return nullptr; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; - if (sscanf(sub_field->value, "%f", &retry_policy->backoff_multiplier) != - 1) { - return nullptr; - } - if (retry_policy->backoff_multiplier <= 0) return nullptr; - } else if (strcmp(sub_field->key, "retryableStatusCodes") == 0) { - if (!retry_policy->retryable_status_codes.Empty()) { - return nullptr; // Duplicate. - } - if (sub_field->type != GRPC_JSON_ARRAY) return nullptr; - for (grpc_json* element = sub_field->child; element != nullptr; - element = element->next) { - if (element->type != GRPC_JSON_STRING) return nullptr; - grpc_status_code status; - if (!grpc_status_code_from_string(element->value, &status)) { - return nullptr; - } - retry_policy->retryable_status_codes.Add(status); - } - if (retry_policy->retryable_status_codes.Empty()) return nullptr; - } - } - // Make sure required fields are set. - if (retry_policy->max_attempts == 0 || retry_policy->initial_backoff == 0 || - retry_policy->max_backoff == 0 || retry_policy->backoff_multiplier == 0 || - retry_policy->retryable_status_codes.Empty()) { - return nullptr; - } - return retry_policy; -} - -} // namespace - -RefCountedPtr -ClientChannelMethodParams::CreateFromJson(const grpc_json* json) { - RefCountedPtr method_params = - MakeRefCounted(); - for (grpc_json* field = json->child; field != nullptr; field = field->next) { - if (field->key == nullptr) continue; - if (strcmp(field->key, "waitForReady") == 0) { - if (method_params->wait_for_ready_ != WAIT_FOR_READY_UNSET) { - return nullptr; // Duplicate. - } - if (!ParseWaitForReady(field, &method_params->wait_for_ready_)) { - return nullptr; - } - } else if (strcmp(field->key, "timeout") == 0) { - if (method_params->timeout_ > 0) return nullptr; // Duplicate. - if (!ParseDuration(field, &method_params->timeout_)) return nullptr; - } else if (strcmp(field->key, "retryPolicy") == 0) { - if (method_params->retry_policy_ != nullptr) { - return nullptr; // Duplicate. - } - method_params->retry_policy_ = ParseRetryPolicy(field); - if (method_params->retry_policy_ == nullptr) return nullptr; - } - } - return method_params; -} - -} // namespace internal -} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h deleted file mode 100644 index a31d360f17..0000000000 --- a/src/core/ext/filters/client_channel/method_params.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H - -#include - -#include "src/core/lib/channel/status_util.h" -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis -#include "src/core/lib/json/json.h" - -namespace grpc_core { -namespace internal { - -class ClientChannelMethodParams : public RefCounted { - public: - enum WaitForReady { - WAIT_FOR_READY_UNSET = 0, - WAIT_FOR_READY_FALSE, - WAIT_FOR_READY_TRUE - }; - - struct RetryPolicy { - int max_attempts = 0; - grpc_millis initial_backoff = 0; - grpc_millis max_backoff = 0; - float backoff_multiplier = 0; - StatusCodeSet retryable_status_codes; - }; - - /// Creates a method_parameters object from \a json. - /// Intended for use with ServiceConfig::CreateMethodConfigTable(). - static RefCountedPtr CreateFromJson( - const grpc_json* json); - - grpc_millis timeout() const { return timeout_; } - WaitForReady wait_for_ready() const { return wait_for_ready_; } - const RetryPolicy* retry_policy() const { return retry_policy_.get(); } - - private: - // So New() can call our private ctor. - template - friend T* grpc_core::New(Args&&... args); - - // So Delete() can call our private dtor. - template - friend void grpc_core::Delete(T*); - - ClientChannelMethodParams() {} - virtual ~ClientChannelMethodParams() {} - - grpc_millis timeout_ = 0; - WaitForReady wait_for_ready_ = WAIT_FOR_READY_UNSET; - UniquePtr retry_policy_; -}; - -} // namespace internal -} // namespace grpc_core - -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 9562a3f893..90bc88961d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -308,13 +308,12 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { gpr_free(r->pending_request_); r->pending_request_ = nullptr; if (r->lb_addresses_ != nullptr) { - static const char* args_to_remove[2]; + static const char* args_to_remove[1]; size_t num_args_to_remove = 0; - grpc_arg new_args[3]; + grpc_arg args_to_add[2]; size_t num_args_to_add = 0; - new_args[num_args_to_add++] = + args_to_add[num_args_to_add++] = grpc_lb_addresses_create_channel_arg(r->lb_addresses_); - grpc_core::UniquePtr service_config; char* service_config_string = nullptr; if (r->service_config_json_ != nullptr) { service_config_string = ChooseServiceConfig(r->service_config_json_); @@ -323,24 +322,12 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "selected service config choice: %s", service_config_string); args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG; - new_args[num_args_to_add++] = grpc_channel_arg_string_create( + args_to_add[num_args_to_add++] = grpc_channel_arg_string_create( (char*)GRPC_ARG_SERVICE_CONFIG, service_config_string); - service_config = - grpc_core::ServiceConfig::Create(service_config_string); - if (service_config != nullptr) { - const char* lb_policy_name = - service_config->GetLoadBalancingPolicyName(); - if (lb_policy_name != nullptr) { - args_to_remove[num_args_to_remove++] = GRPC_ARG_LB_POLICY_NAME; - new_args[num_args_to_add++] = grpc_channel_arg_string_create( - (char*)GRPC_ARG_LB_POLICY_NAME, - const_cast(lb_policy_name)); - } - } } } result = grpc_channel_args_copy_and_add_and_remove( - r->channel_args_, args_to_remove, num_args_to_remove, new_args, + r->channel_args_, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); gpr_free(service_config_string); grpc_lb_addresses_destroy(r->lb_addresses_); diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc new file mode 100644 index 0000000000..82a26ace63 --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -0,0 +1,384 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/ext/filters/client_channel/resolver_result_parsing.h" + +#include +#include +#include + +#include +#include +#include + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" + +// As per the retry design, we do not allow more than 5 retry attempts. +#define MAX_MAX_RETRY_ATTEMPTS 5 + +namespace grpc_core { +namespace internal { + +namespace { + +// Converts string format from JSON to proto. +grpc_core::UniquePtr ConvertCamelToSnake(const char* camel) { + const size_t size = strlen(camel); + char* snake = static_cast(gpr_malloc(size * 2)); + size_t j = 0; + for (size_t i = 0; i < size; ++i) { + if (isupper(camel[i])) { + snake[j++] = '_'; + snake[j++] = tolower(camel[i]); + } else { + snake[j++] = camel[i]; + } + } + snake[j] = '\0'; + return grpc_core::UniquePtr(snake); +} + +} // namespace + +ProcessedResolverResult::ProcessedResolverResult( + const grpc_channel_args* resolver_result, bool parse_retry) { + ProcessServiceConfig(resolver_result, parse_retry); + // If no LB config was found above, just find the LB policy name then. + if (lb_policy_config_ == nullptr) ProcessLbPolicyName(resolver_result); +} + +void ProcessedResolverResult::ProcessServiceConfig( + const grpc_channel_args* resolver_result, bool parse_retry) { + const grpc_arg* channel_arg = + grpc_channel_args_find(resolver_result, GRPC_ARG_SERVICE_CONFIG); + const char* service_config_json = grpc_channel_arg_get_string(channel_arg); + if (service_config_json != nullptr) { + service_config_json_.reset(gpr_strdup(service_config_json)); + service_config_ = grpc_core::ServiceConfig::Create(service_config_json); + if (service_config_ != nullptr) { + if (parse_retry) { + channel_arg = + grpc_channel_args_find(resolver_result, GRPC_ARG_SERVER_URI); + const char* server_uri = grpc_channel_arg_get_string(channel_arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); + GPR_ASSERT(uri->path[0] != '\0'); + server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path; + service_config_->ParseGlobalParams(ParseServiceConfig, this); + grpc_uri_destroy(uri); + } else { + service_config_->ParseGlobalParams(ParseServiceConfig, this); + } + method_params_table_ = service_config_->CreateMethodConfigTable( + ClientChannelMethodParams::CreateFromJson); + } + } +} + +void ProcessedResolverResult::ProcessLbPolicyName( + const grpc_channel_args* resolver_result) { + const char* lb_policy_name = nullptr; + // Prefer the LB policy name found in the service config. Note that this is + // checking the deprecated loadBalancingPolicy field, rather than the new + // loadBalancingConfig field. + if (service_config_ != nullptr) { + lb_policy_name = service_config_->GetLoadBalancingPolicyName(); + } + // Otherwise, find the LB policy name set by the client API. + if (lb_policy_name == nullptr) { + const grpc_arg* channel_arg = + grpc_channel_args_find(resolver_result, GRPC_ARG_LB_POLICY_NAME); + lb_policy_name = grpc_channel_arg_get_string(channel_arg); + } + // Special case: If at least one balancer address is present, we use + // the grpclb policy, regardless of what the resolver has returned. + const grpc_arg* channel_arg = + grpc_channel_args_find(resolver_result, GRPC_ARG_LB_ADDRESSES); + if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { + grpc_lb_addresses* addresses = + static_cast(channel_arg->value.pointer.p); + if (grpc_lb_addresses_contains_balancer_address(*addresses)) { + if (lb_policy_name != nullptr && + gpr_stricmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided at least one " + "balancer address -- forcing use of grpclb LB policy", + lb_policy_name); + } + lb_policy_name = "grpclb"; + } + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; + lb_policy_name_.reset(gpr_strdup(lb_policy_name)); +} + +void ProcessedResolverResult::ParseServiceConfig( + const grpc_json* field, ProcessedResolverResult* parsing_state) { + parsing_state->ParseLbConfigFromServiceConfig(field); + if (parsing_state->server_name_ != nullptr) { + parsing_state->ParseRetryThrottleParamsFromServiceConfig(field); + } +} + +void ProcessedResolverResult::ParseLbConfigFromServiceConfig( + const grpc_json* field) { + if (lb_policy_config_ != nullptr) return; // Already found. + // Find the LB config global parameter. + if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0 || + field->type != GRPC_JSON_ARRAY) { + return; // Not valid lb config array. + } + // Find the first LB policy that this client supports. + for (grpc_json* lb_config = field->child; lb_config != nullptr; + lb_config = lb_config->next) { + if (lb_config->type != GRPC_JSON_OBJECT) return; + // Find the policy object. + grpc_json* policy = nullptr; + for (grpc_json* field = lb_config->child; field != nullptr; + field = field->next) { + if (field->key == nullptr || strcmp(field->key, "policy") != 0 || + field->type != GRPC_JSON_OBJECT) { + return; + } + if (policy != nullptr) return; // Duplicate. + policy = field; + } + // Find the specific policy content since the policy object is of type + // "oneof". + grpc_json* policy_content = nullptr; + for (grpc_json* field = policy->child; field != nullptr; + field = field->next) { + if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) return; + if (policy_content != nullptr) return; // Violate "oneof" type. + policy_content = field; + } + grpc_core::UniquePtr lb_policy_name = + ConvertCamelToSnake(policy_content->key); + if (!grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists( + lb_policy_name.get())) { + continue; + } + lb_policy_name_ = std::move(lb_policy_name); + lb_policy_config_ = policy_content->child; + return; + } +} + +void ProcessedResolverResult::ParseRetryThrottleParamsFromServiceConfig( + const grpc_json* field) { + if (strcmp(field->key, "retryThrottling") == 0) { + if (retry_throttle_data_ != nullptr) return; // Duplicate. + if (field->type != GRPC_JSON_OBJECT) return; + int max_milli_tokens = 0; + int milli_token_ratio = 0; + for (grpc_json* sub_field = field->child; sub_field != nullptr; + sub_field = sub_field->next) { + if (sub_field->key == nullptr) return; + if (strcmp(sub_field->key, "maxTokens") == 0) { + if (max_milli_tokens != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); + if (max_milli_tokens == -1) return; + max_milli_tokens *= 1000; + } else if (strcmp(sub_field->key, "tokenRatio") == 0) { + if (milli_token_ratio != 0) return; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return; + // We support up to 3 decimal digits. + size_t whole_len = strlen(sub_field->value); + uint32_t multiplier = 1; + uint32_t decimal_value = 0; + const char* decimal_point = strchr(sub_field->value, '.'); + if (decimal_point != nullptr) { + whole_len = static_cast(decimal_point - sub_field->value); + multiplier = 1000; + size_t decimal_len = strlen(decimal_point + 1); + if (decimal_len > 3) decimal_len = 3; + if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, + &decimal_value)) { + return; + } + uint32_t decimal_multiplier = 1; + for (size_t i = 0; i < (3 - decimal_len); ++i) { + decimal_multiplier *= 10; + } + decimal_value *= decimal_multiplier; + } + uint32_t whole_value; + if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, + &whole_value)) { + return; + } + milli_token_ratio = + static_cast((whole_value * multiplier) + decimal_value); + if (milli_token_ratio <= 0) return; + } + } + retry_throttle_data_ = + grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( + server_name_, max_milli_tokens, milli_token_ratio); + } +} + +namespace { + +bool ParseWaitForReady( + grpc_json* field, ClientChannelMethodParams::WaitForReady* wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE + ? ClientChannelMethodParams::WAIT_FOR_READY_TRUE + : ClientChannelMethodParams::WAIT_FOR_READY_FALSE; + return true; +} + +// Parses a JSON field of the form generated for a google.proto.Duration +// proto message, as per: +// https://developers.google.com/protocol-buffers/docs/proto3#json +bool ParseDuration(grpc_json* field, grpc_millis* duration) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + UniquePtr buf(gpr_strdup(field->value)); + *(buf.get() + len - 1) = '\0'; // Remove trailing 's'. + char* decimal_point = strchr(buf.get(), '.'); + int nanos = 0; + if (decimal_point != nullptr) { + *decimal_point = '\0'; + nanos = gpr_parse_nonnegative_int(decimal_point + 1); + if (nanos == -1) { + return false; + } + int num_digits = static_cast(strlen(decimal_point + 1)); + if (num_digits > 9) { // We don't accept greater precision than nanos. + return false; + } + for (int i = 0; i < (9 - num_digits); ++i) { + nanos *= 10; + } + } + int seconds = + decimal_point == buf.get() ? 0 : gpr_parse_nonnegative_int(buf.get()); + if (seconds == -1) return false; + *duration = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; + return true; +} + +UniquePtr ParseRetryPolicy( + grpc_json* field) { + auto retry_policy = MakeUnique(); + if (field->type != GRPC_JSON_OBJECT) return nullptr; + for (grpc_json* sub_field = field->child; sub_field != nullptr; + sub_field = sub_field->next) { + if (sub_field->key == nullptr) return nullptr; + if (strcmp(sub_field->key, "maxAttempts") == 0) { + if (retry_policy->max_attempts != 0) return nullptr; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; + retry_policy->max_attempts = gpr_parse_nonnegative_int(sub_field->value); + if (retry_policy->max_attempts <= 1) return nullptr; + if (retry_policy->max_attempts > MAX_MAX_RETRY_ATTEMPTS) { + gpr_log(GPR_ERROR, + "service config: clamped retryPolicy.maxAttempts at %d", + MAX_MAX_RETRY_ATTEMPTS); + retry_policy->max_attempts = MAX_MAX_RETRY_ATTEMPTS; + } + } else if (strcmp(sub_field->key, "initialBackoff") == 0) { + if (retry_policy->initial_backoff > 0) return nullptr; // Duplicate. + if (!ParseDuration(sub_field, &retry_policy->initial_backoff)) { + return nullptr; + } + if (retry_policy->initial_backoff == 0) return nullptr; + } else if (strcmp(sub_field->key, "maxBackoff") == 0) { + if (retry_policy->max_backoff > 0) return nullptr; // Duplicate. + if (!ParseDuration(sub_field, &retry_policy->max_backoff)) { + return nullptr; + } + if (retry_policy->max_backoff == 0) return nullptr; + } else if (strcmp(sub_field->key, "backoffMultiplier") == 0) { + if (retry_policy->backoff_multiplier != 0) return nullptr; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; + if (sscanf(sub_field->value, "%f", &retry_policy->backoff_multiplier) != + 1) { + return nullptr; + } + if (retry_policy->backoff_multiplier <= 0) return nullptr; + } else if (strcmp(sub_field->key, "retryableStatusCodes") == 0) { + if (!retry_policy->retryable_status_codes.Empty()) { + return nullptr; // Duplicate. + } + if (sub_field->type != GRPC_JSON_ARRAY) return nullptr; + for (grpc_json* element = sub_field->child; element != nullptr; + element = element->next) { + if (element->type != GRPC_JSON_STRING) return nullptr; + grpc_status_code status; + if (!grpc_status_code_from_string(element->value, &status)) { + return nullptr; + } + retry_policy->retryable_status_codes.Add(status); + } + if (retry_policy->retryable_status_codes.Empty()) return nullptr; + } + } + // Make sure required fields are set. + if (retry_policy->max_attempts == 0 || retry_policy->initial_backoff == 0 || + retry_policy->max_backoff == 0 || retry_policy->backoff_multiplier == 0 || + retry_policy->retryable_status_codes.Empty()) { + return nullptr; + } + return retry_policy; +} + +} // namespace + +RefCountedPtr +ClientChannelMethodParams::CreateFromJson(const grpc_json* json) { + RefCountedPtr method_params = + MakeRefCounted(); + for (grpc_json* field = json->child; field != nullptr; field = field->next) { + if (field->key == nullptr) continue; + if (strcmp(field->key, "waitForReady") == 0) { + if (method_params->wait_for_ready_ != WAIT_FOR_READY_UNSET) { + return nullptr; // Duplicate. + } + if (!ParseWaitForReady(field, &method_params->wait_for_ready_)) { + return nullptr; + } + } else if (strcmp(field->key, "timeout") == 0) { + if (method_params->timeout_ > 0) return nullptr; // Duplicate. + if (!ParseDuration(field, &method_params->timeout_)) return nullptr; + } else if (strcmp(field->key, "retryPolicy") == 0) { + if (method_params->retry_policy_ != nullptr) { + return nullptr; // Duplicate. + } + method_params->retry_policy_ = ParseRetryPolicy(field); + if (method_params->retry_policy_ == nullptr) return nullptr; + } + } + return method_params; +} + +} // namespace internal +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h new file mode 100644 index 0000000000..f1fb7406bc --- /dev/null +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h @@ -0,0 +1,146 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H + +#include + +#include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis +#include "src/core/lib/json/json.h" +#include "src/core/lib/slice/slice_hash_table.h" +#include "src/core/lib/transport/service_config.h" + +namespace grpc_core { +namespace internal { + +class ClientChannelMethodParams; + +// A table mapping from a method name to its method parameters. +typedef grpc_core::SliceHashTable< + grpc_core::RefCountedPtr> + ClientChannelMethodParamsTable; + +// A container of processed fields from the resolver result. Simplifies the +// usage of resolver result. +class ProcessedResolverResult { + public: + // Processes the resolver result and populates the relative members + // for later consumption. Tries to parse retry parameters only if parse_retry + // is true. + ProcessedResolverResult(const grpc_channel_args* resolver_result, + bool parse_retry); + + // Getters. Any managed object's ownership is transferred. + grpc_core::UniquePtr service_config_json() { + return std::move(service_config_json_); + } + grpc_core::RefCountedPtr retry_throttle_data() { + return std::move(retry_throttle_data_); + } + grpc_core::RefCountedPtr + method_params_table() { + return std::move(method_params_table_); + } + grpc_core::UniquePtr lb_policy_name() { + return std::move(lb_policy_name_); + } + grpc_json* lb_policy_config() { return lb_policy_config_; } + + private: + // Finds the service config; extracts LB config and (maybe) retry throttle + // params from it. + void ProcessServiceConfig(const grpc_channel_args* resolver_result, + bool parse_retry); + + // Finds the LB policy name (when no LB config was found). + void ProcessLbPolicyName(const grpc_channel_args* resolver_result); + + // Parses the service config. Intended to be used by + // ServiceConfig::ParseGlobalParams. + static void ParseServiceConfig(const grpc_json* field, + ProcessedResolverResult* parsing_state); + // Parses the LB config from service config. + void ParseLbConfigFromServiceConfig(const grpc_json* field); + // Parses the retry throttle parameters from service config. + void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field); + + // Service config. + grpc_core::UniquePtr service_config_json_; + grpc_core::UniquePtr service_config_; + // LB policy. + grpc_json* lb_policy_config_ = nullptr; + grpc_core::UniquePtr lb_policy_name_; + // Retry throttle data. + char* server_name_ = nullptr; + grpc_core::RefCountedPtr retry_throttle_data_; + // Method params table. + grpc_core::RefCountedPtr method_params_table_; +}; + +// The parameters of a method. +class ClientChannelMethodParams : public RefCounted { + public: + enum WaitForReady { + WAIT_FOR_READY_UNSET = 0, + WAIT_FOR_READY_FALSE, + WAIT_FOR_READY_TRUE + }; + + struct RetryPolicy { + int max_attempts = 0; + grpc_millis initial_backoff = 0; + grpc_millis max_backoff = 0; + float backoff_multiplier = 0; + StatusCodeSet retryable_status_codes; + }; + + /// Creates a method_parameters object from \a json. + /// Intended for use with ServiceConfig::CreateMethodConfigTable(). + static RefCountedPtr CreateFromJson( + const grpc_json* json); + + grpc_millis timeout() const { return timeout_; } + WaitForReady wait_for_ready() const { return wait_for_ready_; } + const RetryPolicy* retry_policy() const { return retry_policy_.get(); } + + private: + // So New() can call our private ctor. + template + friend T* grpc_core::New(Args&&... args); + + // So Delete() can call our private dtor. + template + friend void grpc_core::Delete(T*); + + ClientChannelMethodParams() {} + virtual ~ClientChannelMethodParams() {} + + grpc_millis timeout_ = 0; + WaitForReady wait_for_ready_ = WAIT_FOR_READY_UNSET; + UniquePtr retry_policy_; +}; + +} // namespace internal +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H */ -- cgit v1.2.3