aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc163
1 files changed, 21 insertions, 142 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 8e9ee889e1..d94e20124e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -34,9 +34,9 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
-#include "src/core/ext/filters/client_channel/method_params.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
@@ -63,6 +63,8 @@
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParams;
+using grpc_core::internal::ClientChannelMethodParamsTable;
+using grpc_core::internal::ProcessedResolverResult;
using grpc_core::internal::ServerRetryThrottleData;
/* Client channel implementation */
@@ -83,10 +85,6 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
struct external_connectivity_watcher;
-typedef grpc_core::SliceHashTable<
- grpc_core::RefCountedPtr<ClientChannelMethodParams>>
- MethodParamsTable;
-
typedef struct client_channel_channel_data {
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
bool started_resolving;
@@ -102,7 +100,7 @@ typedef struct client_channel_channel_data {
/** retry throttle data */
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** maps method names to method_parameters structs */
- grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
+ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args* resolver_result;
/** a list of closures that are all waiting for resolver result to come in */
@@ -251,66 +249,6 @@ static void start_resolving_locked(channel_data* chand) {
&chand->on_resolver_result_changed);
}
-typedef struct {
- char* server_name;
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
-} service_config_parsing_state;
-
-static void parse_retry_throttle_params(
- const grpc_json* field, service_config_parsing_state* parsing_state) {
- if (strcmp(field->key, "retryThrottling") == 0) {
- if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
- if (field->type != GRPC_JSON_OBJECT) return;
- int max_milli_tokens = 0;
- int milli_token_ratio = 0;
- for (grpc_json* sub_field = field->child; sub_field != nullptr;
- sub_field = sub_field->next) {
- if (sub_field->key == nullptr) return;
- if (strcmp(sub_field->key, "maxTokens") == 0) {
- if (max_milli_tokens != 0) return; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return;
- max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
- if (max_milli_tokens == -1) return;
- max_milli_tokens *= 1000;
- } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
- if (milli_token_ratio != 0) return; // Duplicate.
- if (sub_field->type != GRPC_JSON_NUMBER) return;
- // We support up to 3 decimal digits.
- size_t whole_len = strlen(sub_field->value);
- uint32_t multiplier = 1;
- uint32_t decimal_value = 0;
- const char* decimal_point = strchr(sub_field->value, '.');
- if (decimal_point != nullptr) {
- whole_len = static_cast<size_t>(decimal_point - sub_field->value);
- multiplier = 1000;
- size_t decimal_len = strlen(decimal_point + 1);
- if (decimal_len > 3) decimal_len = 3;
- if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
- &decimal_value)) {
- return;
- }
- uint32_t decimal_multiplier = 1;
- for (size_t i = 0; i < (3 - decimal_len); ++i) {
- decimal_multiplier *= 10;
- }
- decimal_value *= decimal_multiplier;
- }
- uint32_t whole_value;
- if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
- &whole_value)) {
- return;
- }
- milli_token_ratio =
- static_cast<int>((whole_value * multiplier) + decimal_value);
- if (milli_token_ratio <= 0) return;
- }
- }
- parsing_state->retry_throttle_data =
- grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
- parsing_state->server_name, max_milli_tokens, milli_token_ratio);
- }
-}
-
// Invoked from the resolver NextLocked() callback when the resolver
// is shutting down.
static void on_resolver_shutdown_locked(channel_data* chand,
@@ -352,37 +290,6 @@ static void on_resolver_shutdown_locked(channel_data* chand,
GRPC_ERROR_UNREF(error);
}
-// Returns the LB policy name from the resolver result.
-static grpc_core::UniquePtr<char>
-get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
- // Find LB policy name in channel args.
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
- // Special case: If at least one balancer address is present, we use
- // the grpclb policy, regardless of what the resolver actually specified.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
- if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
- if (lb_policy_name != nullptr &&
- gpr_stricmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided at least one "
- "balancer address -- forcing use of grpclb LB policy",
- lb_policy_name);
- }
- lb_policy_name = "grpclb";
- }
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
-}
-
static void request_reresolution_locked(void* arg, grpc_error* error) {
reresolution_request_args* args =
static_cast<reresolution_request_args*>(arg);
@@ -410,13 +317,14 @@ using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
// *connectivity_error to its initial connectivity state; otherwise,
// leaves them unchanged.
static void create_new_lb_policy_locked(
- channel_data* chand, char* lb_policy_name,
+ channel_data* chand, char* lb_policy_name, grpc_json* lb_config,
grpc_connectivity_state* connectivity_state,
grpc_error** connectivity_error, TraceStringVector* trace_strings) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.lb_config = lb_config;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, lb_policy_args);
@@ -473,44 +381,6 @@ static void create_new_lb_policy_locked(
}
}
-// Returns the service config (as a JSON string) from the resolver result.
-// Also updates state in chand.
-static grpc_core::UniquePtr<char>
-get_service_config_from_resolver_result_locked(channel_data* chand) {
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
- const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
- if (service_config_json != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
- chand, service_config_json);
- }
- grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
- grpc_core::ServiceConfig::Create(service_config_json);
- if (service_config != nullptr) {
- if (chand->enable_retries) {
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
- const char* server_uri = grpc_channel_arg_get_string(channel_arg);
- GPR_ASSERT(server_uri != nullptr);
- grpc_uri* uri = grpc_uri_parse(server_uri, true);
- GPR_ASSERT(uri->path[0] != '\0');
- service_config_parsing_state parsing_state;
- parsing_state.server_name =
- uri->path[0] == '/' ? uri->path + 1 : uri->path;
- service_config->ParseGlobalParams(parse_retry_throttle_params,
- &parsing_state);
- grpc_uri_destroy(uri);
- chand->retry_throttle_data =
- std::move(parsing_state.retry_throttle_data);
- }
- chand->method_params_table = service_config->CreateMethodConfigTable(
- ClientChannelMethodParams::CreateFromJson);
- }
- }
- return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
-}
-
static void maybe_add_trace_message_for_address_changes_locked(
channel_data* chand, TraceStringVector* trace_strings) {
int resolution_contains_addresses = false;
@@ -598,8 +468,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
}
} else {
+ // Parse the resolver result.
+ ProcessedResolverResult resolver_result(chand->resolver_result,
+ chand->enable_retries);
+ chand->retry_throttle_data = resolver_result.retry_throttle_data();
+ chand->method_params_table = resolver_result.method_params_table();
+ grpc_core::UniquePtr<char> service_config_json =
+ resolver_result.service_config_json();
+ if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
+ chand, service_config_json.get());
+ }
grpc_core::UniquePtr<char> lb_policy_name =
- get_lb_policy_name_from_resolver_result_locked(chand);
+ resolver_result.lb_policy_name();
+ grpc_json* lb_policy_config = resolver_result.lb_policy_config();
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
@@ -614,19 +496,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
chand, lb_policy_name.get(), chand->lb_policy.get());
}
- chand->lb_policy->UpdateLocked(*chand->resolver_result);
+ chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config);
// No need to set the channel's connectivity state; the existing
// watch on the LB policy will take care of that.
set_connectivity_state = false;
} else {
// Instantiate new LB policy.
- create_new_lb_policy_locked(chand, lb_policy_name.get(),
+ create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config,
&connectivity_state, &connectivity_error,
&trace_strings);
}
- // Find service config.
- grpc_core::UniquePtr<char> service_config_json =
- get_service_config_from_resolver_result_locked(chand);
// Note: It's safe to use chand->info_service_config_json here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked