diff options
author | Mark D. Roth <roth@google.com> | 2018-07-12 08:46:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-12 08:46:17 -0700 |
commit | 4decd144dc5543f3bd6315a00b1d08a5fb568abf (patch) | |
tree | 2dd45189053b45900d5039555dfc871c45ff3890 /src | |
parent | ea44d938918ccb21885d0c40d711175ad25ecf1b (diff) | |
parent | 4f0dec7fcc7504462d3c75022e25ecc4102ea851 (diff) |
Merge pull request #15896 from markdroth/client_channel_refactor
Refactor resolver result callback
Diffstat (limited to 'src')
3 files changed, 255 insertions, 228 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 520431e63b..04f7a2c830 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -126,9 +126,9 @@ typedef struct client_channel_channel_data { /* the following properties are guarded by a mutex since APIs require them to be instantaneously available */ gpr_mu info_mu; - char* info_lb_policy_name; + grpc_core::UniquePtr<char> info_lb_policy_name; /** service config in JSON form */ - char* info_service_config_json; + grpc_core::UniquePtr<char> info_service_config_json; } channel_data; typedef struct { @@ -284,6 +284,78 @@ static void parse_retry_throttle_params( } } +// Invoked from the resolver NextLocked() callback when the resolver +// is shutting down. +static void on_resolver_shutdown_locked(channel_data* chand, + grpc_error* error) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: shutting down", chand); + } + if (chand->lb_policy != nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, + chand->lb_policy.get()); + } + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), + chand->interested_parties); + chand->lb_policy.reset(); + } + if (chand->resolver != nullptr) { + // This should never happen; it can only be triggered by a resolver + // implementation spotaneously deciding to report shutdown without + // being orphaned. This code is included just to be defensive. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", + chand, chand->resolver.get()); + } + chand->resolver.reset(); + set_channel_connectivity_state_locked( + chand, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Resolver spontaneous shutdown", &error, 1), + "resolver_spontaneous_shutdown"); + } + grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Channel disconnected", &error, 1)); + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; + GRPC_ERROR_UNREF(error); +} + +// Returns the LB policy name from the resolver result. +static grpc_core::UniquePtr<char> +get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { + // Find LB policy name in channel args. + const grpc_arg* channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); + const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); + // Special case: If at least one balancer address is present, we use + // the grpclb policy, regardless of what the resolver actually specified. + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); + if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); + if (grpc_lb_addresses_contains_balancer_address(*addresses)) { + if (lb_policy_name != nullptr && + gpr_stricmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided at least one " + "balancer address -- forcing use of grpclb LB policy", + lb_policy_name); + } + lb_policy_name = "grpclb"; + } + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; + return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name)); +} + static void request_reresolution_locked(void* arg, grpc_error* error) { reresolution_request_args* args = static_cast<reresolution_request_args*>(arg); @@ -304,234 +376,183 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } -// TODO(roth): The logic in this function is very hard to follow. We -// should refactor this so that it's easier to understand, perhaps as -// part of changing the resolver API to more clearly differentiate -// between transient failures and shutdown. -static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { - channel_data* chand = static_cast<channel_data*>(arg); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p: got resolver result: resolver_result=%p error=%s", chand, - chand->resolver_result, grpc_error_string(error)); - } - // Extract the following fields from the resolver result, if non-nullptr. - bool lb_policy_updated = false; - bool lb_policy_created = false; - char* lb_policy_name_dup = nullptr; - bool lb_policy_name_changed = false; - grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy; - char* service_config_json = nullptr; - grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; - grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; - if (chand->resolver_result != nullptr) { - if (chand->resolver != nullptr) { - // Find LB policy name. - const grpc_arg* channel_arg = grpc_channel_args_find( - chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); - // Special case: If at least one balancer address is present, we use - // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); - bool found_balancer_address = false; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) { - found_balancer_address = true; - break; - } - } - if (found_balancer_address) { - if (lb_policy_name != nullptr && - strcmp(lb_policy_name, "grpclb") != 0) { - gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided at least one " - "balancer address -- forcing use of grpclb LB policy", - lb_policy_name); - } - lb_policy_name = "grpclb"; - } - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - lb_policy_name_changed = - chand->info_lb_policy_name == nullptr || - gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - lb_policy_updated = true; - chand->lb_policy->UpdateLocked(*chand->resolver_result); - } else { - // Instantiate new LB policy. - grpc_core::LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = chand->combiner; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.args = chand->resolver_result; - new_lb_policy = - grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - lb_policy_name, lb_policy_args); - if (GPR_UNLIKELY(new_lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", - lb_policy_name); - } else { - lb_policy_created = true; - reresolution_request_args* args = - static_cast<reresolution_request_args*>( - gpr_zalloc(sizeof(*args))); - args->chand = chand; - args->lb_policy = new_lb_policy.get(); - GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, - grpc_combiner_scheduler(chand->combiner)); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - new_lb_policy->SetReresolutionClosureLocked(&args->closure); - } - } - // Before we clean up, save a copy of lb_policy_name, since it might - // be pointing to data inside chand->resolver_result. - // The copy will be saved in chand->lb_policy_name below. - lb_policy_name_dup = gpr_strdup(lb_policy_name); - // Find service config. - channel_arg = grpc_channel_args_find(chand->resolver_result, - GRPC_ARG_SERVICE_CONFIG); - service_config_json = - gpr_strdup(grpc_channel_arg_get_string(channel_arg)); - if (service_config_json != nullptr) { - grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = - grpc_core::ServiceConfig::Create(service_config_json); - if (service_config != nullptr) { - if (chand->enable_retries) { - channel_arg = grpc_channel_args_find(chand->resolver_result, - GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(channel_arg); - GPR_ASSERT(server_uri != nullptr); - grpc_uri* uri = grpc_uri_parse(server_uri, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - service_config->ParseGlobalParams(parse_retry_throttle_params, - &parsing_state); - grpc_uri_destroy(uri); - retry_throttle_data = std::move(parsing_state.retry_throttle_data); - } - method_params_table = service_config->CreateMethodConfigTable( - ClientChannelMethodParams::CreateFromJson); - } - } +// Creates a new LB policy, replacing any previous one. +// If the new policy is created successfully, sets *connectivity_state and +// *connectivity_error to its initial connectivity state; otherwise, +// leaves them unchanged. +static void create_new_lb_policy_locked( + channel_data* chand, char* lb_policy_name, + grpc_connectivity_state* connectivity_state, + grpc_error** connectivity_error) { + grpc_core::LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = chand->combiner; + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.args = chand->resolver_result; + grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy = + grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + lb_policy_name, lb_policy_args); + if (GPR_UNLIKELY(new_lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, + lb_policy_name, new_lb_policy.get()); } - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " - "service_config=\"%s\"", - chand, lb_policy_name_dup, - lb_policy_name_changed ? " (changed)" : "", service_config_json); - } - // Now swap out fields in chand. Note that the new values may still - // be nullptr if (e.g.) the resolver failed to return results or the - // results did not contain the necessary data. - // - // First, swap out the data used by cc_get_channel_info(). - gpr_mu_lock(&chand->info_mu); - if (lb_policy_name_dup != nullptr) { - gpr_free(chand->info_lb_policy_name); - chand->info_lb_policy_name = lb_policy_name_dup; - } - if (service_config_json != nullptr) { - gpr_free(chand->info_service_config_json); - chand->info_service_config_json = service_config_json; - } - gpr_mu_unlock(&chand->info_mu); - // Swap out the retry throttle data. - chand->retry_throttle_data = std::move(retry_throttle_data); - // Swap out the method params table. - chand->method_params_table = std::move(method_params_table); - // If we have a new LB policy or are shutting down (in which case - // new_lb_policy will be nullptr), swap out the LB policy, unreffing the - // old one and removing its fds from chand->interested_parties. - // Note that we do NOT do this if either (a) we updated the existing - // LB policy above or (b) we failed to create the new LB policy (in - // which case we want to continue using the most recent one we had). - if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE || - chand->resolver == nullptr) { + // Swap out the LB policy and update the fds in + // chand->interested_parties. if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand, + gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, chand->lb_policy.get()); } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); - chand->lb_policy.reset(); } chand->lb_policy = std::move(new_lb_policy); + grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), + chand->interested_parties); + // Set up re-resolution callback. + reresolution_request_args* args = + static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args))); + args->chand = chand; + args->lb_policy = chand->lb_policy.get(); + GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, + grpc_combiner_scheduler(chand->combiner)); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); + chand->lb_policy->SetReresolutionClosureLocked(&args->closure); + // Get the new LB policy's initial connectivity state and start a + // connectivity watch. + GRPC_ERROR_UNREF(*connectivity_error); + *connectivity_state = + chand->lb_policy->CheckConnectivityLocked(connectivity_error); + if (chand->exit_idle_when_lb_policy_arrives) { + chand->lb_policy->ExitIdleLocked(); + chand->exit_idle_when_lb_policy_arrives = false; + } + watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); + } +} + +// Returns the service config (as a JSON string) from the resolver result. +// Also updates state in chand. +static grpc_core::UniquePtr<char> +get_service_config_from_resolver_result_locked(channel_data* chand) { + const grpc_arg* channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); + const char* service_config_json = grpc_channel_arg_get_string(channel_arg); + if (service_config_json != nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json); + } + grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = + grpc_core::ServiceConfig::Create(service_config_json); + if (service_config != nullptr) { + if (chand->enable_retries) { + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); + const char* server_uri = grpc_channel_arg_get_string(channel_arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); + GPR_ASSERT(uri->path[0] != '\0'); + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + service_config->ParseGlobalParams(parse_retry_throttle_params, + &parsing_state); + grpc_uri_destroy(uri); + chand->retry_throttle_data = + std::move(parsing_state.retry_throttle_data); + } + chand->method_params_table = service_config->CreateMethodConfigTable( + ClientChannelMethodParams::CreateFromJson); + } } - // Now that we've swapped out the relevant fields of chand, check for - // error or shutdown. + return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json)); +} + +// Callback invoked when a resolver result is available. +static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { + channel_data* chand = static_cast<channel_data*>(arg); + if (grpc_client_channel_trace.enabled()) { + const char* disposition = + chand->resolver_result != nullptr + ? "" + : (error == GRPC_ERROR_NONE ? " (transient error)" + : " (resolver shutdown)"); + gpr_log(GPR_INFO, + "chand=%p: got resolver result: resolver_result=%p error=%s%s", + chand, chand->resolver_result, grpc_error_string(error), + disposition); + } + // Handle shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { + on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); + return; + } + // Data used to set the channel's connectivity state. + bool set_connectivity_state = true; + grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_error* connectivity_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); + // chand->resolver_result will be null in the case of a transient + // resolution error. In that case, we don't have any new result to + // process, which means that we keep using the previous result (if any). + if (chand->resolver_result == nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down", chand); - } - if (chand->resolver != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand); - } - chand->resolver.reset(); + gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); } - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Got resolver result after disconnection", &error, 1), - "resolver_gone"); - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - } else { // Not shutting down. - grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error* state_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); - if (lb_policy_created) { + } else { + grpc_core::UniquePtr<char> lb_policy_name = + get_lb_policy_name_from_resolver_result_locked(chand); + // Check to see if we're already using the right LB policy. + // Note: It's safe to use chand->info_lb_policy_name here without + // taking a lock on chand->info_mu, because this function is the + // only thing that modifies its value, and it can only be invoked + // once at any given time. + bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr || + gpr_stricmp(chand->info_lb_policy_name.get(), + lb_policy_name.get()) != 0; + if (chand->lb_policy != nullptr && !lb_policy_name_changed) { + // Continue using the same LB policy. Update with new addresses. if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand); - } - GRPC_ERROR_UNREF(state_error); - state = chand->lb_policy->CheckConnectivityLocked(&state_error); - grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - if (chand->exit_idle_when_lb_policy_arrives) { - chand->lb_policy->ExitIdleLocked(); - chand->exit_idle_when_lb_policy_arrives = false; + gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", + chand, lb_policy_name.get(), chand->lb_policy.get()); } - watch_lb_policy_locked(chand, chand->lb_policy.get(), state); - } else if (chand->resolver_result == nullptr) { - // Transient failure. - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - } - if (!lb_policy_updated) { - set_channel_connectivity_state_locked( - chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); - } + chand->lb_policy->UpdateLocked(*chand->resolver_result); + // No need to set the channel's connectivity state; the existing + // watch on the LB policy will take care of that. + set_connectivity_state = false; + } else { + // Instantiate new LB policy. + create_new_lb_policy_locked(chand, lb_policy_name.get(), + &connectivity_state, &connectivity_error); + } + // Find service config. + grpc_core::UniquePtr<char> service_config_json = + get_service_config_from_resolver_result_locked(chand); + // Swap out the data used by cc_get_channel_info(). + gpr_mu_lock(&chand->info_mu); + chand->info_lb_policy_name = std::move(lb_policy_name); + chand->info_service_config_json = std::move(service_config_json); + gpr_mu_unlock(&chand->info_mu); + // Clean up. grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); - GRPC_ERROR_UNREF(state_error); } + // Set the channel's connectivity state if needed. + if (set_connectivity_state) { + set_channel_connectivity_state_locked( + chand, connectivity_state, connectivity_error, "resolver_result"); + } else { + GRPC_ERROR_UNREF(connectivity_error); + } + // Invoke closures that were waiting for results and renew the watch. + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); + chand->resolver->NextLocked(&chand->resolver_result, + &chand->on_resolver_result_changed); } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { @@ -611,15 +632,11 @@ static void cc_get_channel_info(grpc_channel_element* elem, channel_data* chand = static_cast<channel_data*>(elem->channel_data); gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != nullptr) { - *info->lb_policy_name = chand->info_lb_policy_name == nullptr - ? nullptr - : gpr_strdup(chand->info_lb_policy_name); + *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); } if (info->service_config_json != nullptr) { *info->service_config_json = - chand->info_service_config_json == nullptr - ? nullptr - : gpr_strdup(chand->info_service_config_json); + gpr_strdup(chand->info_service_config_json.get()); } gpr_mu_unlock(&chand->info_mu); } @@ -699,19 +716,15 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_NONE; } -static void shutdown_resolver_locked(void* arg, grpc_error* error) { - grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg); - resolver->Orphan(); -} - /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); if (chand->resolver != nullptr) { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(), - grpc_combiner_scheduler(chand->combiner)), - GRPC_ERROR_NONE); + // The only way we can get here is if we never started resolving, + // because we take a ref to the channel stack when we start + // resolving and do not release it until the resolver callback is + // invoked after the resolver shuts down. + chand->resolver.reset(); } if (chand->client_channel_factory != nullptr) { grpc_client_channel_factory_unref(chand->client_channel_factory); @@ -721,8 +734,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { chand->interested_parties); chand->lb_policy.reset(); } - gpr_free(chand->info_lb_policy_name); - gpr_free(chand->info_service_config_json); + // TODO(roth): Once we convert the filter API to C++, there will no + // longer be any need to explicitly reset these smart pointer data members. + chand->info_lb_policy_name.reset(); + chand->info_service_config_json.reset(); chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc index 7c8cba55b7..5c6363d295 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc @@ -153,3 +153,11 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( return nullptr; return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); } + +bool grpc_lb_addresses_contains_balancer_address( + const grpc_lb_addresses& addresses) { + for (size_t i = 0; i < addresses.num_addresses; ++i) { + if (addresses.addresses[i].is_balancer) return true; + } + return false; +} diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 6440258158..c07792d8a7 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -101,6 +101,10 @@ grpc_arg grpc_lb_addresses_create_channel_arg( grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( const grpc_channel_args* channel_args); +// Returns true if addresses contains at least one balancer address. +bool grpc_lb_addresses_contains_balancer_address( + const grpc_lb_addresses& addresses); + // // LB policy factory // |