aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Juanli Shen <juanlishen@google.com>2018-11-15 13:11:45 -0800
committerGravatar Juanli Shen <juanlishen@google.com>2018-11-15 13:31:27 -0800
commit8a880801aea7d82a792635c9d26271218efcbad3 (patch)
treea9c45222ddc40aa201958bf57d8849258c92e17f /src/core/ext
parentfdf63a7c023a305d06cda1a247e12cd704a3a976 (diff)
Add support for LB config in service config
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc163
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.cc5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.h4
-rw-r--r--src/core/ext/filters/client_channel/method_params.cc178
-rw-r--r--src/core/ext/filters/client_channel/method_params.h78
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc23
-rw-r--r--src/core/ext/filters/client_channel/resolver_result_parsing.cc384
-rw-r--r--src/core/ext/filters/client_channel/resolver_result_parsing.h146
13 files changed, 592 insertions, 431 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 8e9ee889e1..d94e20124e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -34,9 +34,9 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/filters/client_channel/method_params.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
@@ -63,6 +63,8 @@
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParams;
+using grpc_core::internal::ClientChannelMethodParamsTable;
+using grpc_core::internal::ProcessedResolverResult;
using grpc_core::internal::ServerRetryThrottleData;
/* Client channel implementation */
@@ -83,10 +85,6 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
struct external_connectivity_watcher;
-typedef grpc_core::SliceHashTable<
- grpc_core::RefCountedPtr<ClientChannelMethodParams>>
- MethodParamsTable;
-
typedef struct client_channel_channel_data {
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
bool started_resolving;
@@ -102,7 +100,7 @@ typedef struct client_channel_channel_data {
/** retry throttle data */
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** maps method names to method_parameters structs */
- grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
+ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args* resolver_result;
/** a list of closures that are all waiting for resolver result to come in */
@@ -251,66 +249,6 @@ static void start_resolving_locked(channel_data* chand) {
&chand->on_resolver_result_changed);
}
-typedef struct {
- char* server_name;
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
-} service_config_parsing_state;
-
-static void parse_retry_throttle_params(
- const grpc_json* field, service_config_parsing_state* parsing_state) {
- if (strcmp(field->key, "retryThrottling") == 0) {
- if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
- if (field->type != GRPC_JSON_OBJECT) return;
- int max_milli_tokens = 0;
- int milli_token_ratio = 0;
- for (grpc_json* sub_field = field->child; sub_field != nullptr;
- sub_field = sub_field->next) {
- if (sub_field->key == nullptr) return;
- if (strcmp(sub_field->key, "maxTokens") == 0) {
- if (max_milli_tokens != 0) return; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return;
- max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
- if (max_milli_tokens == -1) return;
- max_milli_tokens *= 1000;
- } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
- if (milli_token_ratio != 0) return; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return;
- // We support up to 3 decimal digits.
- size_t whole_len = strlen(sub_field->value);
- uint32_t multiplier = 1;
- uint32_t decimal_value = 0;
- const char* decimal_point = strchr(sub_field->value, '.');
- if (decimal_point != nullptr) {
- whole_len = static_cast<size_t>(decimal_point - sub_field->value);
- multiplier = 1000;
- size_t decimal_len = strlen(decimal_point + 1);
- if (decimal_len > 3) decimal_len = 3;
- if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
- &decimal_value)) {
- return;
- }
- uint32_t decimal_multiplier = 1;
- for (size_t i = 0; i < (3 - decimal_len); ++i) {
- decimal_multiplier *= 10;
- }
- decimal_value *= decimal_multiplier;
- }
- uint32_t whole_value;
- if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
- &whole_value)) {
- return;
- }
- milli_token_ratio =
- static_cast<int>((whole_value * multiplier) + decimal_value);
- if (milli_token_ratio <= 0) return;
- }
- }
- parsing_state->retry_throttle_data =
- grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
- parsing_state->server_name, max_milli_tokens, milli_token_ratio);
- }
-}
-
// Invoked from the resolver NextLocked() callback when the resolver
// is shutting down.
static void on_resolver_shutdown_locked(channel_data* chand,
@@ -352,37 +290,6 @@ static void on_resolver_shutdown_locked(channel_data* chand,
GRPC_ERROR_UNREF(error);
}
-// Returns the LB policy name from the resolver result.
-static grpc_core::UniquePtr<char>
-get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
- // Find LB policy name in channel args.
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
- // Special case: If at least one balancer address is present, we use
- // the grpclb policy, regardless of what the resolver actually specified.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
- if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
- if (lb_policy_name != nullptr &&
- gpr_stricmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided at least one "
- "balancer address -- forcing use of grpclb LB policy",
- lb_policy_name);
- }
- lb_policy_name = "grpclb";
- }
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
-}
-
static void request_reresolution_locked(void* arg, grpc_error* error) {
reresolution_request_args* args =
static_cast<reresolution_request_args*>(arg);
@@ -410,13 +317,14 @@ using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
// *connectivity_error to its initial connectivity state; otherwise,
// leaves them unchanged.
static void create_new_lb_policy_locked(
- channel_data* chand, char* lb_policy_name,
+ channel_data* chand, char* lb_policy_name, grpc_json* lb_config,
grpc_connectivity_state* connectivity_state,
grpc_error** connectivity_error, TraceStringVector* trace_strings) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.lb_config = lb_config;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, lb_policy_args);
@@ -473,44 +381,6 @@ static void create_new_lb_policy_locked(
}
}
-// Returns the service config (as a JSON string) from the resolver result.
-// Also updates state in chand.
-static grpc_core::UniquePtr<char>
-get_service_config_from_resolver_result_locked(channel_data* chand) {
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
- const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
- if (service_config_json != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
- chand, service_config_json);
- }
- grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
- grpc_core::ServiceConfig::Create(service_config_json);
- if (service_config != nullptr) {
- if (chand->enable_retries) {
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
- const char* server_uri = grpc_channel_arg_get_string(channel_arg);
- GPR_ASSERT(server_uri != nullptr);
- grpc_uri* uri = grpc_uri_parse(server_uri, true);
- GPR_ASSERT(uri->path[0] != '\0');
- service_config_parsing_state parsing_state;
- parsing_state.server_name =
- uri->path[0] == '/' ? uri->path + 1 : uri->path;
- service_config->ParseGlobalParams(parse_retry_throttle_params,
- &parsing_state);
- grpc_uri_destroy(uri);
- chand->retry_throttle_data =
- std::move(parsing_state.retry_throttle_data);
- }
- chand->method_params_table = service_config->CreateMethodConfigTable(
- ClientChannelMethodParams::CreateFromJson);
- }
- }
- return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
-}
-
static void maybe_add_trace_message_for_address_changes_locked(
channel_data* chand, TraceStringVector* trace_strings) {
int resolution_contains_addresses = false;
@@ -598,8 +468,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
}
} else {
+ // Parse the resolver result.
+ ProcessedResolverResult resolver_result(chand->resolver_result,
+ chand->enable_retries);
+ chand->retry_throttle_data = resolver_result.retry_throttle_data();
+ chand->method_params_table = resolver_result.method_params_table();
+ grpc_core::UniquePtr<char> service_config_json =
+ resolver_result.service_config_json();
+ if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
+ chand, service_config_json.get());
+ }
grpc_core::UniquePtr<char> lb_policy_name =
- get_lb_policy_name_from_resolver_result_locked(chand);
+ resolver_result.lb_policy_name();
+ grpc_json* lb_policy_config = resolver_result.lb_policy_config();
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
@@ -614,19 +496,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
chand, lb_policy_name.get(), chand->lb_policy.get());
}
- chand->lb_policy->UpdateLocked(*chand->resolver_result);
+ chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config);
// No need to set the channel's connectivity state; the existing
// watch on the LB policy will take care of that.
set_connectivity_state = false;
} else {
// Instantiate new LB policy.
- create_new_lb_policy_locked(chand, lb_policy_name.get(),
+ create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config,
&connectivity_state, &connectivity_error,
&trace_strings);
}
- // Find service config.
- grpc_core::UniquePtr<char> service_config_json =
- get_service_config_from_resolver_result_locked(chand);
// Note: It's safe to use chand->info_service_config_json here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index b0040457a6..6733fdca81 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -58,6 +58,8 @@ class LoadBalancingPolicy
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_LB_ADDRESSES channel arg.
grpc_channel_args* args = nullptr;
+ /// Load balancing config from the resolver.
+ grpc_json* lb_config = nullptr;
};
/// State used for an LB pick.
@@ -92,10 +94,11 @@ class LoadBalancingPolicy
LoadBalancingPolicy(const LoadBalancingPolicy&) = delete;
LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete;
- /// Updates the policy with a new set of \a args from the resolver.
- /// Note that the LB policy gets the set of addresses from the
+ /// Updates the policy with a new set of \a args and a new \a lb_config from
+ /// the resolver. Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_LB_ADDRESSES channel arg.
- virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT;
+ virtual void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) GRPC_ABSTRACT;
/// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dbb90b438c..dc0e1f89ce 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -123,7 +123,8 @@ class GrpcLb : public LoadBalancingPolicy {
public:
GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -1331,7 +1332,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
-void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
+void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
ProcessChannelArgsLocked(args);
// Update the existing RR policy.
if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked();
@@ -1727,7 +1728,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
rr_policy_.get());
}
- rr_policy_->UpdateLocked(*args);
+ rr_policy_->UpdateLocked(*args, nullptr);
} else {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index eb494486b9..d454401a66 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -46,7 +46,8 @@ class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -159,7 +160,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
- UpdateLocked(*args.args);
+ UpdateLocked(*args.args, args.lb_config);
grpc_subchannel_index_ref();
}
@@ -333,7 +334,8 @@ void PickFirst::UpdateChildRefsLocked() {
child_subchannels_ = std::move(cs);
}
-void PickFirst::UpdateLocked(const grpc_channel_args& args) {
+void PickFirst::UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) {
AutoChildRefsUpdater guard(this);
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index e9ed85cf66..2a16975131 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -57,7 +57,8 @@ class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -232,7 +233,7 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
- UpdateLocked(*args.args);
+ UpdateLocked(*args.args, args.lb_config);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
subchannel_list_->num_subchannels());
@@ -664,7 +665,8 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
-void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
+void RoundRobin::UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 59d57295d4..29cd904375 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -118,7 +118,8 @@ class XdsLb : public LoadBalancingPolicy {
public:
XdsLb(const grpc_lb_addresses* addresses, const Args& args);
- void UpdateLocked(const grpc_channel_args& args) override;
+ void UpdateLocked(const grpc_channel_args& args,
+ grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@@ -1010,6 +1011,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
+// TODO(vishalpowar): Use lb_config in args to configure LB policy.
XdsLb::XdsLb(const grpc_lb_addresses* addresses,
const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
@@ -1314,7 +1316,8 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
-void XdsLb::UpdateLocked(const grpc_channel_args& args) {
+// TODO(vishalpowar): Use lb_config to configure LB policy.
+void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
ProcessChannelArgsLocked(args);
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
@@ -1672,7 +1675,8 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this,
child_policy_.get());
}
- child_policy_->UpdateLocked(*args);
+ // TODO(vishalpowar): Pass the correct LB config.
+ child_policy_->UpdateLocked(*args, nullptr);
} else {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc
index d651b1120d..ad459c9c8c 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc
@@ -94,4 +94,9 @@ LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
return factory->CreateLoadBalancingPolicy(args);
}
+bool LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(const char* name) {
+ GPR_ASSERT(g_state != nullptr);
+ return g_state->GetLoadBalancingPolicyFactory(name) != nullptr;
+}
+
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h
index 2e9bb061ed..338f7c9f69 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.h
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -47,6 +47,10 @@ class LoadBalancingPolicyRegistry {
/// Creates an LB policy of the type specified by \a name.
static OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, const LoadBalancingPolicy::Args& args);
+
+ /// Returns true if the LB policy factory specified by \a name exists in this
+ /// registry.
+ static bool LoadBalancingPolicyExists(const char* name);
};
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc
deleted file mode 100644
index 1f116bb67d..0000000000
--- a/src/core/ext/filters/client_channel/method_params.cc
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <stdio.h>
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/ext/filters/client_channel/method_params.h"
-#include "src/core/lib/channel/status_util.h"
-#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/gprpp/memory.h"
-
-// As per the retry design, we do not allow more than 5 retry attempts.
-#define MAX_MAX_RETRY_ATTEMPTS 5
-
-namespace grpc_core {
-namespace internal {
-
-namespace {
-
-bool ParseWaitForReady(
- grpc_json* field, ClientChannelMethodParams::WaitForReady* wait_for_ready) {
- if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
- return false;
- }
- *wait_for_ready = field->type == GRPC_JSON_TRUE
- ? ClientChannelMethodParams::WAIT_FOR_READY_TRUE
- : ClientChannelMethodParams::WAIT_FOR_READY_FALSE;
- return true;
-}
-
-// Parses a JSON field of the form generated for a google.proto.Duration
-// proto message, as per:
-// https://developers.google.com/protocol-buffers/docs/proto3#json
-bool ParseDuration(grpc_json* field, grpc_millis* duration) {
- if (field->type != GRPC_JSON_STRING) return false;
- size_t len = strlen(field->value);
- if (field->value[len - 1] != 's') return false;
- UniquePtr<char> buf(gpr_strdup(field->value));
- *(buf.get() + len - 1) = '\0'; // Remove trailing 's'.
- char* decimal_point = strchr(buf.get(), '.');
- int nanos = 0;
- if (decimal_point != nullptr) {
- *decimal_point = '\0';
- nanos = gpr_parse_nonnegative_int(decimal_point + 1);
- if (nanos == -1) {
- return false;
- }
- int num_digits = static_cast<int>(strlen(decimal_point + 1));
- if (num_digits > 9) { // We don't accept greater precision than nanos.
- return false;
- }
- for (int i = 0; i < (9 - num_digits); ++i) {
- nanos *= 10;
- }
- }
- int seconds =
- decimal_point == buf.get() ? 0 : gpr_parse_nonnegative_int(buf.get());
- if (seconds == -1) return false;
- *duration = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS;
- return true;
-}
-
-UniquePtr<ClientChannelMethodParams::RetryPolicy> ParseRetryPolicy(
- grpc_json* field) {
- auto retry_policy = MakeUnique<ClientChannelMethodParams::RetryPolicy>();
- if (field->type != GRPC_JSON_OBJECT) return nullptr;
- for (grpc_json* sub_field = field->child; sub_field != nullptr;
- sub_field = sub_field->next) {
- if (sub_field->key == nullptr) return nullptr;
- if (strcmp(sub_field->key, "maxAttempts") == 0) {
- if (retry_policy->max_attempts != 0) return nullptr; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return nullptr;
- retry_policy->max_attempts = gpr_parse_nonnegative_int(sub_field->value);
- if (retry_policy->max_attempts <= 1) return nullptr;
- if (retry_policy->max_attempts > MAX_MAX_RETRY_ATTEMPTS) {
- gpr_log(GPR_ERROR,
- "service config: clamped retryPolicy.maxAttempts at %d",
- MAX_MAX_RETRY_ATTEMPTS);
- retry_policy->max_attempts = MAX_MAX_RETRY_ATTEMPTS;
- }
- } else if (strcmp(sub_field->key, "initialBackoff") == 0) {
- if (retry_policy->initial_backoff > 0) return nullptr; // Duplicate.
- if (!ParseDuration(sub_field, &retry_policy->initial_backoff)) {
- return nullptr;
- }
- if (retry_policy->initial_backoff == 0) return nullptr;
- } else if (strcmp(sub_field->key, "maxBackoff") == 0) {
- if (retry_policy->max_backoff > 0) return nullptr; // Duplicate.
- if (!ParseDuration(sub_field, &retry_policy->max_backoff)) {
- return nullptr;
- }
- if (retry_policy->max_backoff == 0) return nullptr;
- } else if (strcmp(sub_field->key, "backoffMultiplier") == 0) {
- if (retry_policy->backoff_multiplier != 0) return nullptr; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return nullptr;
- if (sscanf(sub_field->value, "%f", &retry_policy->backoff_multiplier) !=
- 1) {
- return nullptr;
- }
- if (retry_policy->backoff_multiplier <= 0) return nullptr;
- } else if (strcmp(sub_field->key, "retryableStatusCodes") == 0) {
- if (!retry_policy->retryable_status_codes.Empty()) {
- return nullptr; // Duplicate.
- }
- if (sub_field->type != GRPC_JSON_ARRAY) return nullptr;
- for (grpc_json* element = sub_field->child; element != nullptr;
- element = element->next) {
- if (element->type != GRPC_JSON_STRING) return nullptr;
- grpc_status_code status;
- if (!grpc_status_code_from_string(element->value, &status)) {
- return nullptr;
- }
- retry_policy->retryable_status_codes.Add(status);
- }
- if (retry_policy->retryable_status_codes.Empty()) return nullptr;
- }
- }
- // Make sure required fields are set.
- if (retry_policy->max_attempts == 0 || retry_policy->initial_backoff == 0 ||
- retry_policy->max_backoff == 0 || retry_policy->backoff_multiplier == 0 ||
- retry_policy->retryable_status_codes.Empty()) {
- return nullptr;
- }
- return retry_policy;
-}
-
-} // namespace
-
-RefCountedPtr<ClientChannelMethodParams>
-ClientChannelMethodParams::CreateFromJson(const grpc_json* json) {
- RefCountedPtr<ClientChannelMethodParams> method_params =
- MakeRefCounted<ClientChannelMethodParams>();
- for (grpc_json* field = json->child; field != nullptr; field = field->next) {
- if (field->key == nullptr) continue;
- if (strcmp(field->key, "waitForReady") == 0) {
- if (method_params->wait_for_ready_ != WAIT_FOR_READY_UNSET) {
- return nullptr; // Duplicate.
- }
- if (!ParseWaitForReady(field, &method_params->wait_for_ready_)) {
- return nullptr;
- }
- } else if (strcmp(field->key, "timeout") == 0) {
- if (method_params->timeout_ > 0) return nullptr; // Duplicate.
- if (!ParseDuration(field, &method_params->timeout_)) return nullptr;
- } else if (strcmp(field->key, "retryPolicy") == 0) {
- if (method_params->retry_policy_ != nullptr) {
- return nullptr; // Duplicate.
- }
- method_params->retry_policy_ = ParseRetryPolicy(field);
- if (method_params->retry_policy_ == nullptr) return nullptr;
- }
- }
- return method_params;
-}
-
-} // namespace internal
-} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h
deleted file mode 100644
index a31d360f17..0000000000
--- a/src/core/ext/filters/client_channel/method_params.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/channel/status_util.h"
-#include "src/core/lib/gprpp/ref_counted.h"
-#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis
-#include "src/core/lib/json/json.h"
-
-namespace grpc_core {
-namespace internal {
-
-class ClientChannelMethodParams : public RefCounted<ClientChannelMethodParams> {
- public:
- enum WaitForReady {
- WAIT_FOR_READY_UNSET = 0,
- WAIT_FOR_READY_FALSE,
- WAIT_FOR_READY_TRUE
- };
-
- struct RetryPolicy {
- int max_attempts = 0;
- grpc_millis initial_backoff = 0;
- grpc_millis max_backoff = 0;
- float backoff_multiplier = 0;
- StatusCodeSet retryable_status_codes;
- };
-
- /// Creates a method_parameters object from \a json.
- /// Intended for use with ServiceConfig::CreateMethodConfigTable().
- static RefCountedPtr<ClientChannelMethodParams> CreateFromJson(
- const grpc_json* json);
-
- grpc_millis timeout() const { return timeout_; }
- WaitForReady wait_for_ready() const { return wait_for_ready_; }
- const RetryPolicy* retry_policy() const { return retry_policy_.get(); }
-
- private:
- // So New() can call our private ctor.
- template <typename T, typename... Args>
- friend T* grpc_core::New(Args&&... args);
-
- // So Delete() can call our private dtor.
- template <typename T>
- friend void grpc_core::Delete(T*);
-
- ClientChannelMethodParams() {}
- virtual ~ClientChannelMethodParams() {}
-
- grpc_millis timeout_ = 0;
- WaitForReady wait_for_ready_ = WAIT_FOR_READY_UNSET;
- UniquePtr<RetryPolicy> retry_policy_;
-};
-
-} // namespace internal
-} // namespace grpc_core
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 9562a3f893..90bc88961d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -308,13 +308,12 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
gpr_free(r->pending_request_);
r->pending_request_ = nullptr;
if (r->lb_addresses_ != nullptr) {
- static const char* args_to_remove[2];
+ static const char* args_to_remove[1];
size_t num_args_to_remove = 0;
- grpc_arg new_args[3];
+ grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
- new_args[num_args_to_add++] =
+ args_to_add[num_args_to_add++] =
grpc_lb_addresses_create_channel_arg(r->lb_addresses_);
- grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config;
char* service_config_string = nullptr;
if (r->service_config_json_ != nullptr) {
service_config_string = ChooseServiceConfig(r->service_config_json_);
@@ -323,24 +322,12 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "selected service config choice: %s",
service_config_string);
args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG;
- new_args[num_args_to_add++] = grpc_channel_arg_string_create(
+ args_to_add[num_args_to_add++] = grpc_channel_arg_string_create(
(char*)GRPC_ARG_SERVICE_CONFIG, service_config_string);
- service_config =
- grpc_core::ServiceConfig::Create(service_config_string);
- if (service_config != nullptr) {
- const char* lb_policy_name =
- service_config->GetLoadBalancingPolicyName();
- if (lb_policy_name != nullptr) {
- args_to_remove[num_args_to_remove++] = GRPC_ARG_LB_POLICY_NAME;
- new_args[num_args_to_add++] = grpc_channel_arg_string_create(
- (char*)GRPC_ARG_LB_POLICY_NAME,
- const_cast<char*>(lb_policy_name));
- }
- }
}
}
result = grpc_channel_args_copy_and_add_and_remove(
- r->channel_args_, args_to_remove, num_args_to_remove, new_args,
+ r->channel_args_, args_to_remove, num_args_to_remove, args_to_add,
num_args_to_add);
gpr_free(service_config_string);
grpc_lb_addresses_destroy(r->lb_addresses_);
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
new file mode 100644
index 0000000000..82a26ace63
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
@@ -0,0 +1,384 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
+
+#include <ctype.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
+
+// As per the retry design, we do not allow more than 5 retry attempts.
+#define MAX_MAX_RETRY_ATTEMPTS 5
+
+namespace grpc_core {
+namespace internal {
+
+namespace {
+
+// Converts string format from JSON to proto.
+grpc_core::UniquePtr<char> ConvertCamelToSnake(const char* camel) {
+ const size_t size = strlen(camel);
+ char* snake = static_cast<char*>(gpr_malloc(size * 2));
+ size_t j = 0;
+ for (size_t i = 0; i < size; ++i) {
+ if (isupper(camel[i])) {
+ snake[j++] = '_';
+ snake[j++] = tolower(camel[i]);
+ } else {
+ snake[j++] = camel[i];
+ }
+ }
+ snake[j] = '\0';
+ return grpc_core::UniquePtr<char>(snake);
+}
+
+} // namespace
+
+ProcessedResolverResult::ProcessedResolverResult(
+ const grpc_channel_args* resolver_result, bool parse_retry) {
+ ProcessServiceConfig(resolver_result, parse_retry);
+ // If no LB config was found above, just find the LB policy name then.
+ if (lb_policy_config_ == nullptr) ProcessLbPolicyName(resolver_result);
+}
+
+void ProcessedResolverResult::ProcessServiceConfig(
+ const grpc_channel_args* resolver_result, bool parse_retry) {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result, GRPC_ARG_SERVICE_CONFIG);
+ const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
+ if (service_config_json != nullptr) {
+ service_config_json_.reset(gpr_strdup(service_config_json));
+ service_config_ = grpc_core::ServiceConfig::Create(service_config_json);
+ if (service_config_ != nullptr) {
+ if (parse_retry) {
+ channel_arg =
+ grpc_channel_args_find(resolver_result, GRPC_ARG_SERVER_URI);
+ const char* server_uri = grpc_channel_arg_get_string(channel_arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ service_config_->ParseGlobalParams(ParseServiceConfig, this);
+ grpc_uri_destroy(uri);
+ } else {
+ service_config_->ParseGlobalParams(ParseServiceConfig, this);
+ }
+ method_params_table_ = service_config_->CreateMethodConfigTable(
+ ClientChannelMethodParams::CreateFromJson);
+ }
+ }
+}
+
+void ProcessedResolverResult::ProcessLbPolicyName(
+ const grpc_channel_args* resolver_result) {
+ const char* lb_policy_name = nullptr;
+ // Prefer the LB policy name found in the service config. Note that this is
+ // checking the deprecated loadBalancingPolicy field, rather than the new
+ // loadBalancingConfig field.
+ if (service_config_ != nullptr) {
+ lb_policy_name = service_config_->GetLoadBalancingPolicyName();
+ }
+ // Otherwise, find the LB policy name set by the client API.
+ if (lb_policy_name == nullptr) {
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result, GRPC_ARG_LB_POLICY_NAME);
+ lb_policy_name = grpc_channel_arg_get_string(channel_arg);
+ }
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver has returned.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
+ if (lb_policy_name != nullptr &&
+ gpr_stricmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
+ }
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+ lb_policy_name_.reset(gpr_strdup(lb_policy_name));
+}
+
+void ProcessedResolverResult::ParseServiceConfig(
+ const grpc_json* field, ProcessedResolverResult* parsing_state) {
+ parsing_state->ParseLbConfigFromServiceConfig(field);
+ if (parsing_state->server_name_ != nullptr) {
+ parsing_state->ParseRetryThrottleParamsFromServiceConfig(field);
+ }
+}
+
+void ProcessedResolverResult::ParseLbConfigFromServiceConfig(
+ const grpc_json* field) {
+ if (lb_policy_config_ != nullptr) return; // Already found.
+ // Find the LB config global parameter.
+ if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0 ||
+ field->type != GRPC_JSON_ARRAY) {
+ return; // Not valid lb config array.
+ }
+ // Find the first LB policy that this client supports.
+ for (grpc_json* lb_config = field->child; lb_config != nullptr;
+ lb_config = lb_config->next) {
+ if (lb_config->type != GRPC_JSON_OBJECT) return;
+ // Find the policy object.
+ grpc_json* policy = nullptr;
+ for (grpc_json* field = lb_config->child; field != nullptr;
+ field = field->next) {
+ if (field->key == nullptr || strcmp(field->key, "policy") != 0 ||
+ field->type != GRPC_JSON_OBJECT) {
+ return;
+ }
+ if (policy != nullptr) return; // Duplicate.
+ policy = field;
+ }
+ // Find the specific policy content since the policy object is of type
+ // "oneof".
+ grpc_json* policy_content = nullptr;
+ for (grpc_json* field = policy->child; field != nullptr;
+ field = field->next) {
+ if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) return;
+ if (policy_content != nullptr) return; // Violate "oneof" type.
+ policy_content = field;
+ }
+ grpc_core::UniquePtr<char> lb_policy_name =
+ ConvertCamelToSnake(policy_content->key);
+ if (!grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(
+ lb_policy_name.get())) {
+ continue;
+ }
+ lb_policy_name_ = std::move(lb_policy_name);
+ lb_policy_config_ = policy_content->child;
+ return;
+ }
+}
+
+void ProcessedResolverResult::ParseRetryThrottleParamsFromServiceConfig(
+ const grpc_json* field) {
+ if (strcmp(field->key, "retryThrottling") == 0) {
+ if (retry_throttle_data_ != nullptr) return; // Duplicate.
+ if (field->type != GRPC_JSON_OBJECT) return;
+ int max_milli_tokens = 0;
+ int milli_token_ratio = 0;
+ for (grpc_json* sub_field = field->child; sub_field != nullptr;
+ sub_field = sub_field->next) {
+ if (sub_field->key == nullptr) return;
+ if (strcmp(sub_field->key, "maxTokens") == 0) {
+ if (max_milli_tokens != 0) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return;
+ max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
+ if (max_milli_tokens == -1) return;
+ max_milli_tokens *= 1000;
+ } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
+ if (milli_token_ratio != 0) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return;
+ // We support up to 3 decimal digits.
+ size_t whole_len = strlen(sub_field->value);
+ uint32_t multiplier = 1;
+ uint32_t decimal_value = 0;
+ const char* decimal_point = strchr(sub_field->value, '.');
+ if (decimal_point != nullptr) {
+ whole_len = static_cast<size_t>(decimal_point - sub_field->value);
+ multiplier = 1000;
+ size_t decimal_len = strlen(decimal_point + 1);
+ if (decimal_len > 3) decimal_len = 3;
+ if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
+ &decimal_value)) {
+ return;
+ }
+ uint32_t decimal_multiplier = 1;
+ for (size_t i = 0; i < (3 - decimal_len); ++i) {
+ decimal_multiplier *= 10;
+ }
+ decimal_value *= decimal_multiplier;
+ }
+ uint32_t whole_value;
+ if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
+ &whole_value)) {
+ return;
+ }
+ milli_token_ratio =
+ static_cast<int>((whole_value * multiplier) + decimal_value);
+ if (milli_token_ratio <= 0) return;
+ }
+ }
+ retry_throttle_data_ =
+ grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
+ server_name_, max_milli_tokens, milli_token_ratio);
+ }
+}
+
+namespace {
+
+bool ParseWaitForReady(
+ grpc_json* field, ClientChannelMethodParams::WaitForReady* wait_for_ready) {
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return false;
+ }
+ *wait_for_ready = field->type == GRPC_JSON_TRUE
+ ? ClientChannelMethodParams::WAIT_FOR_READY_TRUE
+ : ClientChannelMethodParams::WAIT_FOR_READY_FALSE;
+ return true;
+}
+
+// Parses a JSON field of the form generated for a google.proto.Duration
+// proto message, as per:
+// https://developers.google.com/protocol-buffers/docs/proto3#json
+bool ParseDuration(grpc_json* field, grpc_millis* duration) {
+ if (field->type != GRPC_JSON_STRING) return false;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return false;
+ UniquePtr<char> buf(gpr_strdup(field->value));
+ *(buf.get() + len - 1) = '\0'; // Remove trailing 's'.
+ char* decimal_point = strchr(buf.get(), '.');
+ int nanos = 0;
+ if (decimal_point != nullptr) {
+ *decimal_point = '\0';
+ nanos = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (nanos == -1) {
+ return false;
+ }
+ int num_digits = static_cast<int>(strlen(decimal_point + 1));
+ if (num_digits > 9) { // We don't accept greater precision than nanos.
+ return false;
+ }
+ for (int i = 0; i < (9 - num_digits); ++i) {
+ nanos *= 10;
+ }
+ }
+ int seconds =
+ decimal_point == buf.get() ? 0 : gpr_parse_nonnegative_int(buf.get());
+ if (seconds == -1) return false;
+ *duration = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS;
+ return true;
+}
+
+UniquePtr<ClientChannelMethodParams::RetryPolicy> ParseRetryPolicy(
+ grpc_json* field) {
+ auto retry_policy = MakeUnique<ClientChannelMethodParams::RetryPolicy>();
+ if (field->type != GRPC_JSON_OBJECT) return nullptr;
+ for (grpc_json* sub_field = field->child; sub_field != nullptr;
+ sub_field = sub_field->next) {
+ if (sub_field->key == nullptr) return nullptr;
+ if (strcmp(sub_field->key, "maxAttempts") == 0) {
+ if (retry_policy->max_attempts != 0) return nullptr; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return nullptr;
+ retry_policy->max_attempts = gpr_parse_nonnegative_int(sub_field->value);
+ if (retry_policy->max_attempts <= 1) return nullptr;
+ if (retry_policy->max_attempts > MAX_MAX_RETRY_ATTEMPTS) {
+ gpr_log(GPR_ERROR,
+ "service config: clamped retryPolicy.maxAttempts at %d",
+ MAX_MAX_RETRY_ATTEMPTS);
+ retry_policy->max_attempts = MAX_MAX_RETRY_ATTEMPTS;
+ }
+ } else if (strcmp(sub_field->key, "initialBackoff") == 0) {
+ if (retry_policy->initial_backoff > 0) return nullptr; // Duplicate.
+ if (!ParseDuration(sub_field, &retry_policy->initial_backoff)) {
+ return nullptr;
+ }
+ if (retry_policy->initial_backoff == 0) return nullptr;
+ } else if (strcmp(sub_field->key, "maxBackoff") == 0) {
+ if (retry_policy->max_backoff > 0) return nullptr; // Duplicate.
+ if (!ParseDuration(sub_field, &retry_policy->max_backoff)) {
+ return nullptr;
+ }
+ if (retry_policy->max_backoff == 0) return nullptr;
+ } else if (strcmp(sub_field->key, "backoffMultiplier") == 0) {
+ if (retry_policy->backoff_multiplier != 0) return nullptr; // Duplicate.
+ if (sub_field->type != GRPC_JSON_NUMBER) return nullptr;
+ if (sscanf(sub_field->value, "%f", &retry_policy->backoff_multiplier) !=
+ 1) {
+ return nullptr;
+ }
+ if (retry_policy->backoff_multiplier <= 0) return nullptr;
+ } else if (strcmp(sub_field->key, "retryableStatusCodes") == 0) {
+ if (!retry_policy->retryable_status_codes.Empty()) {
+ return nullptr; // Duplicate.
+ }
+ if (sub_field->type != GRPC_JSON_ARRAY) return nullptr;
+ for (grpc_json* element = sub_field->child; element != nullptr;
+ element = element->next) {
+ if (element->type != GRPC_JSON_STRING) return nullptr;
+ grpc_status_code status;
+ if (!grpc_status_code_from_string(element->value, &status)) {
+ return nullptr;
+ }
+ retry_policy->retryable_status_codes.Add(status);
+ }
+ if (retry_policy->retryable_status_codes.Empty()) return nullptr;
+ }
+ }
+ // Make sure required fields are set.
+ if (retry_policy->max_attempts == 0 || retry_policy->initial_backoff == 0 ||
+ retry_policy->max_backoff == 0 || retry_policy->backoff_multiplier == 0 ||
+ retry_policy->retryable_status_codes.Empty()) {
+ return nullptr;
+ }
+ return retry_policy;
+}
+
+} // namespace
+
+RefCountedPtr<ClientChannelMethodParams>
+ClientChannelMethodParams::CreateFromJson(const grpc_json* json) {
+ RefCountedPtr<ClientChannelMethodParams> method_params =
+ MakeRefCounted<ClientChannelMethodParams>();
+ for (grpc_json* field = json->child; field != nullptr; field = field->next) {
+ if (field->key == nullptr) continue;
+ if (strcmp(field->key, "waitForReady") == 0) {
+ if (method_params->wait_for_ready_ != WAIT_FOR_READY_UNSET) {
+ return nullptr; // Duplicate.
+ }
+ if (!ParseWaitForReady(field, &method_params->wait_for_ready_)) {
+ return nullptr;
+ }
+ } else if (strcmp(field->key, "timeout") == 0) {
+ if (method_params->timeout_ > 0) return nullptr; // Duplicate.
+ if (!ParseDuration(field, &method_params->timeout_)) return nullptr;
+ } else if (strcmp(field->key, "retryPolicy") == 0) {
+ if (method_params->retry_policy_ != nullptr) {
+ return nullptr; // Duplicate.
+ }
+ method_params->retry_policy_ = ParseRetryPolicy(field);
+ if (method_params->retry_policy_ == nullptr) return nullptr;
+ }
+ }
+ return method_params;
+}
+
+} // namespace internal
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h
new file mode 100644
index 0000000000..f1fb7406bc
--- /dev/null
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/slice/slice_hash_table.h"
+#include "src/core/lib/transport/service_config.h"
+
+namespace grpc_core {
+namespace internal {
+
+class ClientChannelMethodParams;
+
+// A table mapping from a method name to its method parameters.
+typedef grpc_core::SliceHashTable<
+ grpc_core::RefCountedPtr<ClientChannelMethodParams>>
+ ClientChannelMethodParamsTable;
+
+// A container of processed fields from the resolver result. Simplifies the
+// usage of resolver result.
+class ProcessedResolverResult {
+ public:
+ // Processes the resolver result and populates the relative members
+ // for later consumption. Tries to parse retry parameters only if parse_retry
+ // is true.
+ ProcessedResolverResult(const grpc_channel_args* resolver_result,
+ bool parse_retry);
+
+ // Getters. Any managed object's ownership is transferred.
+ grpc_core::UniquePtr<char> service_config_json() {
+ return std::move(service_config_json_);
+ }
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() {
+ return std::move(retry_throttle_data_);
+ }
+ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable>
+ method_params_table() {
+ return std::move(method_params_table_);
+ }
+ grpc_core::UniquePtr<char> lb_policy_name() {
+ return std::move(lb_policy_name_);
+ }
+ grpc_json* lb_policy_config() { return lb_policy_config_; }
+
+ private:
+ // Finds the service config; extracts LB config and (maybe) retry throttle
+ // params from it.
+ void ProcessServiceConfig(const grpc_channel_args* resolver_result,
+ bool parse_retry);
+
+ // Finds the LB policy name (when no LB config was found).
+ void ProcessLbPolicyName(const grpc_channel_args* resolver_result);
+
+ // Parses the service config. Intended to be used by
+ // ServiceConfig::ParseGlobalParams.
+ static void ParseServiceConfig(const grpc_json* field,
+ ProcessedResolverResult* parsing_state);
+ // Parses the LB config from service config.
+ void ParseLbConfigFromServiceConfig(const grpc_json* field);
+ // Parses the retry throttle parameters from service config.
+ void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field);
+
+ // Service config.
+ grpc_core::UniquePtr<char> service_config_json_;
+ grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config_;
+ // LB policy.
+ grpc_json* lb_policy_config_ = nullptr;
+ grpc_core::UniquePtr<char> lb_policy_name_;
+ // Retry throttle data.
+ char* server_name_ = nullptr;
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+ // Method params table.
+ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
+};
+
+// The parameters of a method.
+class ClientChannelMethodParams : public RefCounted<ClientChannelMethodParams> {
+ public:
+ enum WaitForReady {
+ WAIT_FOR_READY_UNSET = 0,
+ WAIT_FOR_READY_FALSE,
+ WAIT_FOR_READY_TRUE
+ };
+
+ struct RetryPolicy {
+ int max_attempts = 0;
+ grpc_millis initial_backoff = 0;
+ grpc_millis max_backoff = 0;
+ float backoff_multiplier = 0;
+ StatusCodeSet retryable_status_codes;
+ };
+
+ /// Creates a method_parameters object from \a json.
+ /// Intended for use with ServiceConfig::CreateMethodConfigTable().
+ static RefCountedPtr<ClientChannelMethodParams> CreateFromJson(
+ const grpc_json* json);
+
+ grpc_millis timeout() const { return timeout_; }
+ WaitForReady wait_for_ready() const { return wait_for_ready_; }
+ const RetryPolicy* retry_policy() const { return retry_policy_.get(); }
+
+ private:
+ // So New() can call our private ctor.
+ template <typename T, typename... Args>
+ friend T* grpc_core::New(Args&&... args);
+
+ // So Delete() can call our private dtor.
+ template <typename T>
+ friend void grpc_core::Delete(T*);
+
+ ClientChannelMethodParams() {}
+ virtual ~ClientChannelMethodParams() {}
+
+ grpc_millis timeout_ = 0;
+ WaitForReady wait_for_ready_ = WAIT_FOR_READY_UNSET;
+ UniquePtr<RetryPolicy> retry_policy_;
+};
+
+} // namespace internal
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_RESULT_PARSING_H */