From 8a880801aea7d82a792635c9d26271218efcbad3 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Thu, 15 Nov 2018 13:11:45 -0800 Subject: Add support for LB config in service config --- .../ext/filters/client_channel/client_channel.cc | 163 +++------------------ 1 file changed, 21 insertions(+), 142 deletions(-) (limited to 'src/core/ext/filters/client_channel/client_channel.cc') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8e9ee889e1..d94e20124e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -34,9 +34,9 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" -#include "src/core/ext/filters/client_channel/method_params.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" @@ -63,6 +63,8 @@ #include "src/core/lib/transport/status_metadata.h" using grpc_core::internal::ClientChannelMethodParams; +using grpc_core::internal::ClientChannelMethodParamsTable; +using grpc_core::internal::ProcessedResolverResult; using grpc_core::internal::ServerRetryThrottleData; /* Client channel implementation */ @@ -83,10 +85,6 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); struct external_connectivity_watcher; -typedef grpc_core::SliceHashTable< - grpc_core::RefCountedPtr> - MethodParamsTable; - typedef struct client_channel_channel_data { grpc_core::OrphanablePtr resolver; bool started_resolving; @@ -102,7 +100,7 @@ typedef struct client_channel_channel_data { /** retry throttle data */ grpc_core::RefCountedPtr retry_throttle_data; /** maps method names to method_parameters structs */ - grpc_core::RefCountedPtr method_params_table; + grpc_core::RefCountedPtr method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ @@ -251,66 +249,6 @@ static void start_resolving_locked(channel_data* chand) { &chand->on_resolver_result_changed); } -typedef struct { - char* server_name; - grpc_core::RefCountedPtr retry_throttle_data; -} service_config_parsing_state; - -static void parse_retry_throttle_params( - const grpc_json* field, service_config_parsing_state* parsing_state) { - if (strcmp(field->key, "retryThrottling") == 0) { - if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. - if (field->type != GRPC_JSON_OBJECT) return; - int max_milli_tokens = 0; - int milli_token_ratio = 0; - for (grpc_json* sub_field = field->child; sub_field != nullptr; - sub_field = sub_field->next) { - if (sub_field->key == nullptr) return; - if (strcmp(sub_field->key, "maxTokens") == 0) { - if (max_milli_tokens != 0) return; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return; - max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); - if (max_milli_tokens == -1) return; - max_milli_tokens *= 1000; - } else if (strcmp(sub_field->key, "tokenRatio") == 0) { - if (milli_token_ratio != 0) return; // Duplicate. - if (sub_field->type != GRPC_JSON_NUMBER) return; - // We support up to 3 decimal digits. - size_t whole_len = strlen(sub_field->value); - uint32_t multiplier = 1; - uint32_t decimal_value = 0; - const char* decimal_point = strchr(sub_field->value, '.'); - if (decimal_point != nullptr) { - whole_len = static_cast(decimal_point - sub_field->value); - multiplier = 1000; - size_t decimal_len = strlen(decimal_point + 1); - if (decimal_len > 3) decimal_len = 3; - if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, - &decimal_value)) { - return; - } - uint32_t decimal_multiplier = 1; - for (size_t i = 0; i < (3 - decimal_len); ++i) { - decimal_multiplier *= 10; - } - decimal_value *= decimal_multiplier; - } - uint32_t whole_value; - if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, - &whole_value)) { - return; - } - milli_token_ratio = - static_cast((whole_value * multiplier) + decimal_value); - if (milli_token_ratio <= 0) return; - } - } - parsing_state->retry_throttle_data = - grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( - parsing_state->server_name, max_milli_tokens, milli_token_ratio); - } -} - // Invoked from the resolver NextLocked() callback when the resolver // is shutting down. static void on_resolver_shutdown_locked(channel_data* chand, @@ -352,37 +290,6 @@ static void on_resolver_shutdown_locked(channel_data* chand, GRPC_ERROR_UNREF(error); } -// Returns the LB policy name from the resolver result. -static grpc_core::UniquePtr -get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { - // Find LB policy name in channel args. - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); - // Special case: If at least one balancer address is present, we use - // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast(channel_arg->value.pointer.p); - if (grpc_lb_addresses_contains_balancer_address(*addresses)) { - if (lb_policy_name != nullptr && - gpr_stricmp(lb_policy_name, "grpclb") != 0) { - gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided at least one " - "balancer address -- forcing use of grpclb LB policy", - lb_policy_name); - } - lb_policy_name = "grpclb"; - } - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - return grpc_core::UniquePtr(gpr_strdup(lb_policy_name)); -} - static void request_reresolution_locked(void* arg, grpc_error* error) { reresolution_request_args* args = static_cast(arg); @@ -410,13 +317,14 @@ using TraceStringVector = grpc_core::InlinedVector; // *connectivity_error to its initial connectivity state; otherwise, // leaves them unchanged. static void create_new_lb_policy_locked( - channel_data* chand, char* lb_policy_name, + channel_data* chand, char* lb_policy_name, grpc_json* lb_config, grpc_connectivity_state* connectivity_state, grpc_error** connectivity_error, TraceStringVector* trace_strings) { grpc_core::LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = chand->combiner; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.args = chand->resolver_result; + lb_policy_args.lb_config = lb_config; grpc_core::OrphanablePtr new_lb_policy = grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, lb_policy_args); @@ -473,44 +381,6 @@ static void create_new_lb_policy_locked( } } -// Returns the service config (as a JSON string) from the resolver result. -// Also updates state in chand. -static grpc_core::UniquePtr -get_service_config_from_resolver_result_locked(channel_data* chand) { - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); - const char* service_config_json = grpc_channel_arg_get_string(channel_arg); - if (service_config_json != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", - chand, service_config_json); - } - grpc_core::UniquePtr service_config = - grpc_core::ServiceConfig::Create(service_config_json); - if (service_config != nullptr) { - if (chand->enable_retries) { - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(channel_arg); - GPR_ASSERT(server_uri != nullptr); - grpc_uri* uri = grpc_uri_parse(server_uri, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - service_config->ParseGlobalParams(parse_retry_throttle_params, - &parsing_state); - grpc_uri_destroy(uri); - chand->retry_throttle_data = - std::move(parsing_state.retry_throttle_data); - } - chand->method_params_table = service_config->CreateMethodConfigTable( - ClientChannelMethodParams::CreateFromJson); - } - } - return grpc_core::UniquePtr(gpr_strdup(service_config_json)); -} - static void maybe_add_trace_message_for_address_changes_locked( channel_data* chand, TraceStringVector* trace_strings) { int resolution_contains_addresses = false; @@ -598,8 +468,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); } } else { + // Parse the resolver result. + ProcessedResolverResult resolver_result(chand->resolver_result, + chand->enable_retries); + chand->retry_throttle_data = resolver_result.retry_throttle_data(); + chand->method_params_table = resolver_result.method_params_table(); + grpc_core::UniquePtr service_config_json = + resolver_result.service_config_json(); + if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json.get()); + } grpc_core::UniquePtr lb_policy_name = - get_lb_policy_name_from_resolver_result_locked(chand); + resolver_result.lb_policy_name(); + grpc_json* lb_policy_config = resolver_result.lb_policy_config(); // Check to see if we're already using the right LB policy. // Note: It's safe to use chand->info_lb_policy_name here without // taking a lock on chand->info_mu, because this function is the @@ -614,19 +496,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", chand, lb_policy_name.get(), chand->lb_policy.get()); } - chand->lb_policy->UpdateLocked(*chand->resolver_result); + chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config); // No need to set the channel's connectivity state; the existing // watch on the LB policy will take care of that. set_connectivity_state = false; } else { // Instantiate new LB policy. - create_new_lb_policy_locked(chand, lb_policy_name.get(), + create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config, &connectivity_state, &connectivity_error, &trace_strings); } - // Find service config. - grpc_core::UniquePtr service_config_json = - get_service_config_from_resolver_result_locked(chand); // Note: It's safe to use chand->info_service_config_json here without // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked -- cgit v1.2.3