diff options
Diffstat (limited to 'src/core/ext/filters/client_channel')
41 files changed, 2267 insertions, 2067 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index be7962261b..dd741f1e2d 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -35,6 +35,7 @@ #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" +#include "src/core/ext/filters/client_channel/request_routing.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" @@ -86,31 +87,18 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); struct external_connectivity_watcher; typedef struct client_channel_channel_data { - grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; - bool started_resolving; + grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router; + bool deadline_checking_enabled; - grpc_client_channel_factory* client_channel_factory; bool enable_retries; size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; - /** currently active load balancer */ - grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; /** retry throttle data */ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table; - /** incoming resolver result - set by resolver.next() */ - grpc_channel_args* resolver_result; - /** a list of closures that are all waiting for resolver result to come in */ - grpc_closure_list waiting_for_resolver_result_closures; - /** resolver callback */ - grpc_closure on_resolver_result_changed; - /** connectivity state being tracked */ - grpc_connectivity_state_tracker state_tracker; - /** when an lb_policy arrives, should we try to exit idle */ - bool exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack* owning_stack; /** interested parties (owned) */ @@ -127,424 +115,40 @@ typedef struct client_channel_channel_data { grpc_core::UniquePtr<char> info_lb_policy_name; /** service config in JSON form */ grpc_core::UniquePtr<char> info_service_config_json; - /* backpointer to grpc_channel's channelz node */ - grpc_core::channelz::ClientChannelNode* channelz_channel; - /* caches if the last resolution event contained addresses */ - bool previous_resolution_contained_addresses; } channel_data; -typedef struct { - channel_data* chand; - /** used as an identifier, don't dereference it because the LB policy may be - * non-existing when the callback is run */ - grpc_core::LoadBalancingPolicy* lb_policy; - grpc_closure closure; -} reresolution_request_args; - -/** We create one watcher for each new lb_policy that is returned from a - resolver, to watch for state changes from the lb_policy. When a state - change is seen, we update the channel, and create a new watcher. */ -typedef struct { - channel_data* chand; - grpc_closure on_changed; - grpc_connectivity_state state; - grpc_core::LoadBalancingPolicy* lb_policy; -} lb_policy_connectivity_watcher; - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state); - -static const char* channel_connectivity_state_change_string( - grpc_connectivity_state state) { - switch (state) { - case GRPC_CHANNEL_IDLE: - return "Channel state change to IDLE"; - case GRPC_CHANNEL_CONNECTING: - return "Channel state change to CONNECTING"; - case GRPC_CHANNEL_READY: - return "Channel state change to READY"; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - return "Channel state change to TRANSIENT_FAILURE"; - case GRPC_CHANNEL_SHUTDOWN: - return "Channel state change to SHUTDOWN"; - } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); -} - -static void set_channel_connectivity_state_locked(channel_data* chand, - grpc_connectivity_state state, - grpc_error* error, - const char* reason) { - /* TODO: Improve failure handling: - * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. - * - Hand over pending picks from old policies during the switch that happens - * when resolver provides an update. */ - if (chand->lb_policy != nullptr) { - if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* cancel picks with wait_for_ready=false */ - chand->lb_policy->CancelMatchingPicksLocked( - /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, - /* check= */ 0, GRPC_ERROR_REF(error)); - } else if (state == GRPC_CHANNEL_SHUTDOWN) { - /* cancel all picks */ - chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, - GRPC_ERROR_REF(error)); - } - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, - grpc_connectivity_state_name(state)); - } - if (chand->channelz_channel != nullptr) { - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - channel_connectivity_state_change_string(state))); - } - grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); -} - -static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { - lb_policy_connectivity_watcher* w = - static_cast<lb_policy_connectivity_watcher*>(arg); - /* check if the notification is for the latest policy */ - if (w->lb_policy == w->chand->lb_policy.get()) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, - w->lb_policy, grpc_connectivity_state_name(w->state)); - } - set_channel_connectivity_state_locked(w->chand, w->state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy_locked(w->chand, w->lb_policy, w->state); - } - } - GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); - gpr_free(w); -} - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state) { - lb_policy_connectivity_watcher* w = - static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w))); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; - GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner)); - w->state = current_state; - w->lb_policy = lb_policy; - lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); -} - -static void start_resolving_locked(channel_data* chand) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); - } - GPR_ASSERT(!chand->started_resolving); - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); -} - -// Invoked from the resolver NextLocked() callback when the resolver -// is shutting down. -static void on_resolver_shutdown_locked(channel_data* chand, - grpc_error* error) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down", chand); - } - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - if (chand->resolver != nullptr) { - // This should never happen; it can only be triggered by a resolver - // implementation spotaneously deciding to report shutdown without - // being orphaned. This code is included just to be defensive. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", - chand, chand->resolver.get()); - } - chand->resolver.reset(); - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver spontaneous shutdown", &error, 1), - "resolver_spontaneous_shutdown"); - } - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - GRPC_ERROR_UNREF(error); -} - -static void request_reresolution_locked(void* arg, grpc_error* error) { - reresolution_request_args* args = - static_cast<reresolution_request_args*>(arg); - channel_data* chand = args->chand; - // If this invocation is for a stale LB policy, treat it as an LB shutdown - // signal. - if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || - chand->resolver == nullptr) { - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); - gpr_free(args); - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); - } - chand->resolver->RequestReresolutionLocked(); - // Give back the closure to the LB policy. - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); -} - -using TraceStringVector = grpc_core::InlinedVector<char*, 3>; - -// Creates a new LB policy, replacing any previous one. -// If the new policy is created successfully, sets *connectivity_state and -// *connectivity_error to its initial connectivity state; otherwise, -// leaves them unchanged. -static void create_new_lb_policy_locked( - channel_data* chand, char* lb_policy_name, grpc_json* lb_config, - grpc_connectivity_state* connectivity_state, - grpc_error** connectivity_error, TraceStringVector* trace_strings) { - grpc_core::LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = chand->combiner; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.lb_config = lb_config; - grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy = - grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - lb_policy_name, lb_policy_args); - if (GPR_UNLIKELY(new_lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, - lb_policy_name, new_lb_policy.get()); - } - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - // Swap out the LB policy and update the fds in - // chand->interested_parties. - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); - } - chand->lb_policy = std::move(new_lb_policy); - grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - // Set up re-resolution callback. - reresolution_request_args* args = - static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args))); - args->chand = chand; - args->lb_policy = chand->lb_policy.get(); - GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, - grpc_combiner_scheduler(chand->combiner)); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); - // Get the new LB policy's initial connectivity state and start a - // connectivity watch. - GRPC_ERROR_UNREF(*connectivity_error); - *connectivity_state = - chand->lb_policy->CheckConnectivityLocked(connectivity_error); - if (chand->exit_idle_when_lb_policy_arrives) { - chand->lb_policy->ExitIdleLocked(); - chand->exit_idle_when_lb_policy_arrives = false; - } - watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); - } -} - -static void maybe_add_trace_message_for_address_changes_locked( - channel_data* chand, TraceStringVector* trace_strings) { - int resolution_contains_addresses = false; - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); - if (addresses->num_addresses > 0) { - resolution_contains_addresses = true; - } - } - if (!resolution_contains_addresses && - chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became empty")); - } else if (resolution_contains_addresses && - !chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became non-empty")); - } - chand->previous_resolution_contained_addresses = - resolution_contains_addresses; -} - -static void concatenate_and_add_channel_trace_locked( - channel_data* chand, TraceStringVector* trace_strings) { - if (!trace_strings->empty()) { - gpr_strvec v; - gpr_strvec_init(&v); - gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); - bool is_first = 1; - for (size_t i = 0; i < trace_strings->size(); ++i) { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); - is_first = false; - gpr_strvec_add(&v, (*trace_strings)[i]); - } - char* flat; - size_t flat_len = 0; - flat = gpr_strvec_flatten(&v, &flat_len); - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_new(flat, flat_len, gpr_free)); - gpr_strvec_destroy(&v); - } -} - -// Callback invoked when a resolver result is available. -static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { +// Synchronous callback from chand->request_router to process a resolver +// result update. +static bool process_resolver_result_locked(void* arg, + const grpc_channel_args& args, + const char** lb_policy_name, + grpc_json** lb_policy_config) { channel_data* chand = static_cast<channel_data*>(arg); + ProcessedResolverResult resolver_result(args, chand->enable_retries); + grpc_core::UniquePtr<char> service_config_json = + resolver_result.service_config_json(); if (grpc_client_channel_trace.enabled()) { - const char* disposition = - chand->resolver_result != nullptr - ? "" - : (error == GRPC_ERROR_NONE ? " (transient error)" - : " (resolver shutdown)"); - gpr_log(GPR_INFO, - "chand=%p: got resolver result: resolver_result=%p error=%s%s", - chand, chand->resolver_result, grpc_error_string(error), - disposition); + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json.get()); } - // Handle shutdown. - if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { - on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); - return; - } - // Data used to set the channel's connectivity state. - bool set_connectivity_state = true; - // We only want to trace the address resolution in the follow cases: - // (a) Address resolution resulted in service config change. - // (b) Address resolution that causes number of backends to go from - // zero to non-zero. - // (c) Address resolution that causes number of backends to go from - // non-zero to zero. - // (d) Address resolution that causes a new LB policy to be created. - // - // we track a list of strings to eventually be concatenated and traced. - TraceStringVector trace_strings; - grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error* connectivity_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); - // chand->resolver_result will be null in the case of a transient - // resolution error. In that case, we don't have any new result to - // process, which means that we keep using the previous result (if any). - if (chand->resolver_result == nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); - } - // Don't override connectivity state if we already have an LB policy. - if (chand->lb_policy != nullptr) set_connectivity_state = false; - } else { - // Parse the resolver result. - ProcessedResolverResult resolver_result(chand->resolver_result, - chand->enable_retries); - chand->retry_throttle_data = resolver_result.retry_throttle_data(); - chand->method_params_table = resolver_result.method_params_table(); - grpc_core::UniquePtr<char> service_config_json = - resolver_result.service_config_json(); - if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", - chand, service_config_json.get()); - } - grpc_core::UniquePtr<char> lb_policy_name = - resolver_result.lb_policy_name(); - grpc_json* lb_policy_config = resolver_result.lb_policy_config(); - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr || - gpr_stricmp(chand->info_lb_policy_name.get(), - lb_policy_name.get()) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", - chand, lb_policy_name.get(), chand->lb_policy.get()); - } - chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config); - // No need to set the channel's connectivity state; the existing - // watch on the LB policy will take care of that. - set_connectivity_state = false; - } else { - // Instantiate new LB policy. - create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config, - &connectivity_state, &connectivity_error, - &trace_strings); - } - // Note: It's safe to use chand->info_service_config_json here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - if (chand->channelz_channel != nullptr) { - if (((service_config_json == nullptr) != - (chand->info_service_config_json == nullptr)) || - (service_config_json != nullptr && - strcmp(service_config_json.get(), - chand->info_service_config_json.get()) != 0)) { - // TODO(ncteisen): might be worth somehow including a snippet of the - // config in the trace, at the risk of bloating the trace logs. - trace_strings.push_back(gpr_strdup("Service config changed")); - } - maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings); - concatenate_and_add_channel_trace_locked(chand, &trace_strings); - } - // Swap out the data used by cc_get_channel_info(). - gpr_mu_lock(&chand->info_mu); - chand->info_lb_policy_name = std::move(lb_policy_name); - chand->info_service_config_json = std::move(service_config_json); - gpr_mu_unlock(&chand->info_mu); - // Clean up. - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - } - // Set the channel's connectivity state if needed. - if (set_connectivity_state) { - set_channel_connectivity_state_locked( - chand, connectivity_state, connectivity_error, "resolver_result"); - } else { - GRPC_ERROR_UNREF(connectivity_error); - } - // Invoke closures that were waiting for results and renew the watch. - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); + // Update channel state. + chand->retry_throttle_data = resolver_result.retry_throttle_data(); + chand->method_params_table = resolver_result.method_params_table(); + // Swap out the data used by cc_get_channel_info(). + gpr_mu_lock(&chand->info_mu); + chand->info_lb_policy_name = resolver_result.lb_policy_name(); + const bool service_config_changed = + ((service_config_json == nullptr) != + (chand->info_service_config_json == nullptr)) || + (service_config_json != nullptr && + strcmp(service_config_json.get(), + chand->info_service_config_json.get()) != 0); + chand->info_service_config_json = std::move(service_config_json); + gpr_mu_unlock(&chand->info_mu); + // Return results. + *lb_policy_name = chand->info_lb_policy_name.get(); + *lb_policy_config = resolver_result.lb_policy_config(); + return service_config_changed; } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { @@ -554,15 +158,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); if (op->on_connectivity_state_change != nullptr) { - grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + chand->request_router->NotifyOnConnectivityStateChange( + op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { - if (chand->lb_policy == nullptr) { + if (chand->request_router->lb_policy() == nullptr) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); @@ -570,14 +173,9 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } else { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; - pick_state.initial_metadata = nullptr; - pick_state.initial_metadata_flags = 0; - pick_state.on_complete = nullptr; - memset(&pick_state.subchannel_call_context, 0, - sizeof(pick_state.subchannel_call_context)); - pick_state.user_data = nullptr; // Pick must return synchronously, because pick_state.on_complete is null. - GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); + GPR_ASSERT( + chand->request_router->lb_policy()->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); @@ -596,37 +194,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } if (op->disconnect_with_error != GRPC_ERROR_NONE) { - if (chand->resolver != nullptr) { - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - chand->resolver.reset(); - if (!chand->started_resolving) { - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_REF(op->disconnect_with_error)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - } - GRPC_ERROR_UNREF(op->disconnect_with_error); + chand->request_router->ShutdownLocked(op->disconnect_with_error); } if (op->reset_connect_backoff) { - if (chand->resolver != nullptr) { - chand->resolver->ResetBackoffLocked(); - chand->resolver->RequestReresolutionLocked(); - } - if (chand->lb_policy != nullptr) { - chand->lb_policy->ResetBackoffLocked(); - } + chand->request_router->ResetConnectionBackoffLocked(); } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } @@ -677,12 +252,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; - GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, - on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner)); + chand->deadline_checking_enabled = + grpc_deadline_checking_enabled(args->channel_args); chand->interested_parties = grpc_pollset_set_create(); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. const grpc_arg* arg = grpc_channel_args_find( @@ -692,8 +264,6 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, // Record enable_retries. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); chand->enable_retries = grpc_channel_arg_get_bool(arg, true); - chand->channelz_channel = nullptr; - chand->previous_resolution_contained_addresses = false; // Record client channel factory. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); @@ -705,9 +275,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref( - static_cast<grpc_client_channel_factory*>(arg->value.pointer.p)); - chand->client_channel_factory = + grpc_client_channel_factory* client_channel_factory = static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); @@ -723,39 +291,24 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_args* new_args = nullptr; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); - // Instantiate resolver. - chand->resolver = grpc_core::ResolverRegistry::CreateResolver( - proxy_name != nullptr ? proxy_name : arg->value.string, - new_args != nullptr ? new_args : args->channel_args, - chand->interested_parties, chand->combiner); - if (proxy_name != nullptr) gpr_free(proxy_name); - if (new_args != nullptr) grpc_channel_args_destroy(new_args); - if (chand->resolver == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); - } - chand->deadline_checking_enabled = - grpc_deadline_checking_enabled(args->channel_args); - return GRPC_ERROR_NONE; + // Instantiate request router. + grpc_client_channel_factory_ref(client_channel_factory); + grpc_error* error = GRPC_ERROR_NONE; + chand->request_router.Init( + chand->owning_stack, chand->combiner, client_channel_factory, + chand->interested_parties, &grpc_client_channel_trace, + process_resolver_result_locked, chand, + proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */, + new_args != nullptr ? new_args : args->channel_args, &error); + gpr_free(proxy_name); + grpc_channel_args_destroy(new_args); + return error; } /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - if (chand->resolver != nullptr) { - // The only way we can get here is if we never started resolving, - // because we take a ref to the channel stack when we start - // resolving and do not release it until the resolver callback is - // invoked after the resolver shuts down. - chand->resolver.reset(); - } - if (chand->client_channel_factory != nullptr) { - grpc_client_channel_factory_unref(chand->client_channel_factory); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } + chand->request_router.Destroy(); // TODO(roth): Once we convert the filter API to C++, there will no // longer be any need to explicitly reset these smart pointer data members. chand->info_lb_policy_name.reset(); @@ -763,7 +316,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); - grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); @@ -820,6 +372,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { // - add census stats for retries namespace { + struct call_data; // State used for starting a retryable batch on a subchannel call. @@ -904,12 +457,12 @@ struct subchannel_call_retry_state { bool completed_recv_initial_metadata : 1; bool started_recv_trailing_metadata : 1; bool completed_recv_trailing_metadata : 1; + // State for callback processing. subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; grpc_error* recv_message_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; - // State for callback processing. // NOTE: Do not move this next to the metadata bitfields above. That would // save space but will also result in a data race because compiler will // generate a 2 byte store which overwrites the meta-data fields upon @@ -918,12 +471,12 @@ struct subchannel_call_retry_state { }; // Pending batches stored in call data. -typedef struct { +struct pending_batch { // The pending batch. If nullptr, this slot is empty. grpc_transport_stream_op_batch* batch; // Indicates whether payload for send ops has been cached in call data. bool send_ops_cached; -} pending_batch; +}; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. @@ -960,11 +513,8 @@ struct call_data { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { GPR_ASSERT(pending_batches[i].batch == nullptr); } - for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (pick.subchannel_call_context[i].value != nullptr) { - pick.subchannel_call_context[i].destroy( - pick.subchannel_call_context[i].value); - } + if (have_request) { + request.Destroy(); } } @@ -991,12 +541,11 @@ struct call_data { // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; - grpc_core::LoadBalancingPolicy::PickState pick; + grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request; + bool have_request = false; grpc_closure pick_closure; - grpc_closure pick_cancel_closure; grpc_polling_entity* pollent = nullptr; - bool pollent_added_to_interested_parties = false; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -1046,6 +595,7 @@ struct call_data { grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; grpc_metadata_batch send_trailing_metadata; }; + } // namespace // Forward declarations. @@ -1448,8 +998,9 @@ static void do_retry(grpc_call_element* elem, "client_channel_call_retry"); calld->subchannel_call = nullptr; } - if (calld->pick.connected_subchannel != nullptr) { - calld->pick.connected_subchannel.reset(); + if (calld->have_request) { + calld->have_request = false; + calld->request.Destroy(); } // Compute backoff delay. grpc_millis next_attempt_time; @@ -1598,6 +1149,7 @@ static bool maybe_retry(grpc_call_element* elem, // namespace { + subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) @@ -1638,6 +1190,7 @@ void subchannel_batch_data::destroy() { call_data* calld = static_cast<call_data*>(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } + } // namespace // Creates a subchannel_batch_data object on the call's arena with the @@ -2654,17 +2207,18 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { const size_t parent_data_size = calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->pick.subchannel_call_context, // context - calld->call_combiner, // call_combiner - parent_data_size // parent_data_size + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->request->pick()->subchannel_call_context, // context + calld->call_combiner, // call_combiner + parent_data_size // parent_data_size }; - grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( - call_args, &calld->subchannel_call); + grpc_error* new_error = + calld->request->pick()->connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -2676,7 +2230,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { if (parent_data_size > 0) { new (grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)) - subchannel_call_retry_state(calld->pick.subchannel_call_context); + subchannel_call_retry_state( + calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } @@ -2688,7 +2243,7 @@ static void pick_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { + if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) { // Failed to create subchannel. // If there was no error, this is an LB policy drop, in which case // we return an error; otherwise, we may retry. @@ -2717,135 +2272,27 @@ static void pick_done(void* arg, grpc_error* error) { } } -static void maybe_add_call_to_channel_interested_parties_locked( - grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (!calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = true; - grpc_polling_entity_add_to_pollset_set(calld->pollent, - chand->interested_parties); - } -} - -static void maybe_del_call_from_channel_interested_parties_locked( - grpc_call_element* elem) { +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = false; - grpc_polling_entity_del_from_pollset_set(calld->pollent, - chand->interested_parties); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (chand->request_router->GetConnectivityState() == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; } + return false; } -// Invoked when a pick is completed to leave the client_channel combiner -// and continue processing in the call combiner. -// If needed, removes the call's polling entity from chand->interested_parties. -static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { - call_data* calld = static_cast<call_data*>(elem->call_data); - maybe_del_call_from_channel_interested_parties_locked(elem); - GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->pick_closure, error); -} - -namespace grpc_core { - -// Performs subchannel pick via LB policy. -class LbPicker { - public: - // Starts a pick on chand->lb_policy. - static void StartLocked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy.get()); - } - // If this is a retry, use the send_initial_metadata payload that - // we've cached; otherwise, use the pending batch. The - // send_initial_metadata batch will be the first pending batch in the - // list, as set by get_batch_index() above. - calld->pick.initial_metadata = - calld->seen_send_initial_metadata - ? &calld->send_initial_metadata - : calld->pending_batches[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - calld->pick.initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, - grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->pick_closure; - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - grpc_error* error = GRPC_ERROR_NONE; - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); - if (GPR_LIKELY(pick_done)) { - // Pick completed synchronously. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", - chand, calld); - } - pick_done_locked(elem, error); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } else { - // Pick will be returned asynchronously. - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the LB policy can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); - // Request notification on call cancellation. - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, - &LbPicker::CancelLocked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - } - - private: - // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. - // Unrefs the LB policy and invokes pick_done_locked(). - static void DoneLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } - - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling pick from LB policy %p", chand, - calld, chand->lb_policy.get()); - } - chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); - } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); - } -}; - -} // namespace grpc_core - // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. static void apply_service_config_to_call_locked(grpc_call_element* elem) { @@ -2902,224 +2349,66 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } -// If the channel is in TRANSIENT_FAILURE and the call is not -// wait_for_ready=true, fails the call and returns true. -static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; - if (grpc_connectivity_state_check(&chand->state_tracker) == - GRPC_CHANNEL_TRANSIENT_FAILURE && - (batch->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { - pending_batches_fail( - elem, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "channel is in state TRANSIENT_FAILURE"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - true /* yield_call_combiner */); - return true; - } - return false; -} - // Invoked once resolver results are available. -static void process_service_config_and_start_lb_pick_locked( - grpc_call_element* elem) { +static bool maybe_apply_service_config_to_call_locked(void* arg) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); // Check this after applying service config, since it may have // affected the call's wait_for_ready value. - if (fail_call_if_in_transient_failure(elem)) return; + if (fail_call_if_in_transient_failure(elem)) return false; } - // Start LB pick. - grpc_core::LbPicker::StartLocked(elem); + return true; } -namespace grpc_core { - -// Handles waiting for a resolver result. -// Used only for the first call on an idle channel. -class ResolverResultWaiter { - public: - explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: deferring pick pending resolver result", - chand, calld); - } - // Add closure to be run when a resolver result is available. - GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, - grpc_combiner_scheduler(chand->combiner)); - AddToWaitingList(); - // Set cancellation closure, so that we abort if the call is cancelled. - GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, - this, grpc_combiner_scheduler(chand->combiner)); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, - &cancel_closure_); - } - - private: - // Adds closure_ to chand->waiting_for_resolver_result_closures. - void AddToWaitingList() { - channel_data* chand = static_cast<channel_data*>(elem_->channel_data); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &done_closure_, GRPC_ERROR_NONE); - } - - // Invoked when a resolver result is available. - static void DoneLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); - // If CancelLocked() has already run, delete ourselves without doing - // anything. Note that the call stack may have already been destroyed, - // so it's not safe to access anything in elem_. - if (GPR_UNLIKELY(self->finished_)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "call cancelled before resolver result"); - } - Delete(self); - return; - } - // Otherwise, process the resolver result. - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - // Shutting down. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { - // Transient resolver failure. - // If call has wait_for_ready=true, try again; otherwise, fail. - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=true; trying again", - chand, calld); - } - // Re-add ourselves to the waiting list. - self->AddToWaitingList(); - // Return early so that we don't set finished_ to true below. - return; - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=false; failing", - chand, calld); - } - pick_done_locked( - elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", - chand, calld); - } - process_service_config_and_start_lb_pick_locked(elem); - } - self->finished_ = true; - } - - // Invoked when the call is cancelled. - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); - // If DoneLocked() has already run, delete ourselves without doing anything. - if (GPR_LIKELY(self->finished_)) { - Delete(self); - return; - } - // If we are being cancelled, immediately invoke pick_done_locked() - // to propagate the error back to the caller. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling call waiting for name " - "resolution", - chand, calld); - } - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call pick_done_locked() here -- we are essentially - // calling it here instead of calling it in DoneLocked(). - pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - self->finished_ = true; - } - - grpc_call_element* elem_; - grpc_closure done_closure_; - grpc_closure cancel_closure_; - bool finished_ = false; -}; - -} // namespace grpc_core - static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data); - GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(!calld->have_request); GPR_ASSERT(calld->subchannel_call == nullptr); - if (GPR_LIKELY(chand->lb_policy != nullptr)) { - // We already have resolver results, so process the service config - // and start an LB pick. - process_service_config_and_start_lb_pick_locked(elem); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else { - // We do not yet have an LB policy, so wait for a resolver result. - if (GPR_UNLIKELY(!chand->started_resolving)) { - start_resolving_locked(chand); - } else { - // Normally, we want to do this check in - // process_service_config_and_start_lb_pick_locked(), so that we - // can honor the wait_for_ready setting in the service config. - // However, if the channel is in TRANSIENT_FAILURE at this point, that - // means that the resolver has returned a failure, so we're not going - // to get a service config right away. In that case, we fail the - // call now based on the wait_for_ready value passed in from the - // application. - if (fail_call_if_in_transient_failure(elem)) return; - } - // Create a new waiter, which will delete itself when done. - grpc_core::New<grpc_core::ResolverResultWaiter>(elem); - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the resolver can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); + // Normally, we want to do this check until after we've processed the + // service config, so that we can honor the wait_for_ready setting in + // the service config. However, if the channel is in TRANSIENT_FAILURE + // and we don't have an LB policy at this point, that means that the + // resolver has returned a failure, so we're not going to get a service + // config right away. In that case, we fail the call now based on the + // wait_for_ready value passed in from the application. + if (chand->request_router->lb_policy() == nullptr && + fail_call_if_in_transient_failure(elem)) { + return; } + // If this is a retry, use the send_initial_metadata payload that + // we've cached; otherwise, use the pending batch. The + // send_initial_metadata batch will be the first pending batch in the + // list, as set by get_batch_index() above. + // TODO(roth): What if the LB policy needs to add something to the + // call's initial metadata, and then there's a retry? We don't want + // the new metadata to be added twice. We might need to somehow + // allocate the subchannel batch earlier so that we can give the + // subchannel's copy of the metadata batch (which is copied for each + // attempt) to the LB policy instead the one from the parent channel. + grpc_metadata_batch* initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + uint32_t* initial_metadata_flags = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata_flags + : &calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, + grpc_schedule_on_exec_ctx); + calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent, + initial_metadata, initial_metadata_flags, + maybe_apply_service_config_to_call_locked, elem, + &calld->pick_closure); + calld->have_request = true; + chand->request_router->RouteCallLocked(calld->request.get()); } // @@ -3259,23 +2548,10 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { - channel_data* chand = static_cast<channel_data*>(arg); - if (chand->lb_policy != nullptr) { - chand->lb_policy->ExitIdleLocked(); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != nullptr) { - start_resolving_locked(chand); - } - } - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); -} - void grpc_client_channel_set_channelz_node( grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - chand->channelz_channel = node; + chand->request_router->set_channelz_node(node); } void grpc_client_channel_populate_child_refs( @@ -3283,17 +2559,22 @@ void grpc_client_channel_populate_child_refs( grpc_core::channelz::ChildRefsList* child_subchannels, grpc_core::channelz::ChildRefsList* child_channels) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - if (chand->lb_policy != nullptr) { - chand->lb_policy->FillChildRefsForChannelz(child_subchannels, - child_channels); + if (chand->request_router->lb_policy() != nullptr) { + chand->request_router->lb_policy()->FillChildRefsForChannelz( + child_subchannels, child_channels); } } +static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { + channel_data* chand = static_cast<channel_data*>(arg); + chand->request_router->ExitIdleLocked(); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - grpc_connectivity_state out = - grpc_connectivity_state_check(&chand->state_tracker); + grpc_connectivity_state out = chand->request_router->GetConnectivityState(); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( @@ -3338,19 +2619,19 @@ static void external_connectivity_watcher_list_append( } static void external_connectivity_watcher_list_remove( - channel_data* chand, external_connectivity_watcher* too_remove) { + channel_data* chand, external_connectivity_watcher* to_remove) { GPR_ASSERT( - lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + lookup_external_connectivity_watcher(chand, to_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - if (too_remove == chand->external_connectivity_watcher_list_head) { - chand->external_connectivity_watcher_list_head = too_remove->next; + if (to_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = to_remove->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { - if (w->next == too_remove) { + if (w->next == to_remove) { w->next = w->next->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; @@ -3402,15 +2683,15 @@ static void watch_connectivity_state_locked(void* arg, GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); - grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, - w->state, &w->my_closure); + w->chand->request_router->NotifyOnConnectivityStateChange(w->state, + &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); - grpc_connectivity_state_notify_on_state_change( - &found->chand->state_tracker, nullptr, &found->my_closure); + found->chand->request_router->NotifyOnConnectivityStateChange( + nullptr, &found->my_closure); } grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 587919596f..2232c57120 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -51,8 +51,7 @@ HealthCheckClient::HealthCheckClient( RefCountedPtr<ConnectedSubchannel> connected_subchannel, grpc_pollset_set* interested_parties, grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_node) - : InternallyRefCountedWithTracing<HealthCheckClient>( - &grpc_health_check_client_trace), + : InternallyRefCounted<HealthCheckClient>(&grpc_health_check_client_trace), service_name_(service_name), connected_subchannel_(std::move(connected_subchannel)), interested_parties_(interested_parties), @@ -281,8 +280,7 @@ bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) { HealthCheckClient::CallState::CallState( RefCountedPtr<HealthCheckClient> health_check_client, grpc_pollset_set* interested_parties) - : InternallyRefCountedWithTracing<CallState>( - &grpc_health_check_client_trace), + : InternallyRefCounted<CallState>(&grpc_health_check_client_trace), health_check_client_(std::move(health_check_client)), pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), arena_(gpr_arena_create(health_check_client_->connected_subchannel_ diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index f6babef7d6..2369b73fea 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -41,8 +41,7 @@ namespace grpc_core { -class HealthCheckClient - : public InternallyRefCountedWithTracing<HealthCheckClient> { +class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> { public: HealthCheckClient(const char* service_name, RefCountedPtr<ConnectedSubchannel> connected_subchannel, @@ -61,7 +60,7 @@ class HealthCheckClient private: // Contains a call to the backend and all the data related to the call. - class CallState : public InternallyRefCountedWithTracing<CallState> { + class CallState : public InternallyRefCounted<CallState> { public: CallState(RefCountedPtr<HealthCheckClient> health_check_client, grpc_pollset_set* interested_parties_); diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index e065f45639..b4e803689e 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -27,7 +27,7 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( namespace grpc_core { LoadBalancingPolicy::LoadBalancingPolicy(const Args& args) - : InternallyRefCountedWithTracing(&grpc_trace_lb_policy_refcount), + : InternallyRefCounted(&grpc_trace_lb_policy_refcount), combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")), client_channel_factory_(args.client_channel_factory), interested_parties_(grpc_pollset_set_create()), diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 6733fdca81..293d8e960c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -42,8 +42,7 @@ namespace grpc_core { /// /// Any I/O done by the LB policy should be done under the pollset_set /// returned by \a interested_parties(). -class LoadBalancingPolicy - : public InternallyRefCountedWithTracing<LoadBalancingPolicy> { +class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> { public: struct Args { /// The combiner under which all LB policy calls will be run. @@ -56,7 +55,7 @@ class LoadBalancingPolicy grpc_client_channel_factory* client_channel_factory = nullptr; /// Channel args from the resolver. /// Note that the LB policy gets the set of addresses from the - /// GRPC_ARG_LB_ADDRESSES channel arg. + /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. grpc_channel_args* args = nullptr; /// Load balancing config from the resolver. grpc_json* lb_config = nullptr; @@ -66,10 +65,10 @@ class LoadBalancingPolicy struct PickState { /// Initial metadata associated with the picking call. grpc_metadata_batch* initial_metadata = nullptr; - /// Bitmask used for selective cancelling. See + /// Pointer to bitmask used for selective cancelling. See /// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in /// grpc_types.h. - uint32_t initial_metadata_flags = 0; + uint32_t* initial_metadata_flags = nullptr; /// Storage for LB token in \a initial_metadata, or nullptr if not used. grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. @@ -81,11 +80,6 @@ class LoadBalancingPolicy /// Will be populated with context to pass to the subchannel call, if /// needed. grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {}; - /// Upon success, \a *user_data will be set to whatever opaque information - /// may need to be propagated from the LB policy, or nullptr if not needed. - // TODO(roth): As part of revamping our metadata APIs, try to find a - // way to clean this up and C++-ify it. - void** user_data = nullptr; /// Next pointer. For internal use by LB policy. PickState* next = nullptr; }; @@ -94,9 +88,12 @@ class LoadBalancingPolicy LoadBalancingPolicy(const LoadBalancingPolicy&) = delete; LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete; + /// Returns the name of the LB policy. + virtual const char* name() const GRPC_ABSTRACT; + /// Updates the policy with a new set of \a args and a new \a lb_config from /// the resolver. Note that the LB policy gets the set of addresses from the - /// GRPC_ARG_LB_ADDRESSES channel arg. + /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. virtual void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) GRPC_ABSTRACT; @@ -211,12 +208,6 @@ class LoadBalancingPolicy grpc_pollset_set* interested_parties_; /// Callback to force a re-resolution. grpc_closure* request_reresolution_; - - // Dummy classes needed for alignment issues. - // See https://github.com/grpc/grpc/issues/16032 for context. - // TODO(ncteisen): remove this as soon as the issue is resolved. - channelz::ChildRefsList dummy_list_foo; - channelz::ChildRefsList dummy_list_bar; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index dc0e1f89ce..ba40febd53 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -84,6 +84,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -113,15 +114,21 @@ #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 +#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token" + namespace grpc_core { TraceFlag grpc_lb_glb_trace(false, "glb"); namespace { +constexpr char kGrpclb[] = "grpclb"; + class GrpcLb : public LoadBalancingPolicy { public: - GrpcLb(const grpc_lb_addresses* addresses, const Args& args); + explicit GrpcLb(const Args& args); + + const char* name() const override { return kGrpclb; } void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -161,9 +168,6 @@ class GrpcLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<GrpcLbClientStats> client_stats; // Next pending pick. @@ -171,8 +175,7 @@ class GrpcLb : public LoadBalancingPolicy { }; /// Contains a call to the LB server and all the data related to the call. - class BalancerCallState - : public InternallyRefCountedWithTracing<BalancerCallState> { + class BalancerCallState : public InternallyRefCounted<BalancerCallState> { public: explicit BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy); @@ -330,7 +333,7 @@ class GrpcLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -350,7 +353,7 @@ class GrpcLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses +// vtable for LB token channel arg. void* lb_token_copy(void* token) { return token == nullptr ? nullptr @@ -362,38 +365,13 @@ void lb_token_destroy(void* token) { } } int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; + // Always indicate a match, since we don't want this channel arg to + // affect the subchannel's key in the index. return 0; } -const grpc_lb_user_data_vtable lb_token_vtable = { +const grpc_arg_pointer_vtable lb_token_arg_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; -// Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; -} - bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; const grpc_grpclb_ip_address* ip = &server->ip_address; @@ -441,30 +419,16 @@ void ParseServer(const grpc_grpclb_server* server, } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { + ServerAddressList addresses; for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the RR policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const grpc_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const grpc_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; + // Address processing. grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; + // LB token processing. + grpc_mdelem lb_token; if (server->has_load_balance_token) { const size_t lb_token_max_length = GPR_ARRAY_SIZE(server->load_balance_token); @@ -472,9 +436,7 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - user_data = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; + lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { char* uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -482,15 +444,18 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { "be used instead", uri); gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY; } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; - } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + // Add address. + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), + (void*)lb_token.payload, &lb_token_arg_vtable); + grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); + addresses.emplace_back(addr, args); + // Clean up. + GRPC_MDELEM_UNREF(lb_token); + } + return addresses; } // @@ -499,7 +464,7 @@ grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { GrpcLb::BalancerCallState::BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy) - : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace), + : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace), grpclb_policy_(std::move(parent_grpclb_policy)) { GPR_ASSERT(grpclb_policy_ != nullptr); GPR_ASSERT(!grpclb_policy()->shutting_down_); @@ -566,8 +531,7 @@ void GrpcLb::BalancerCallState::Orphan() { void GrpcLb::BalancerCallState::StartQuery() { GPR_ASSERT(lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)", + gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", grpclb_policy_.get(), this, lb_call_); } // Create the ops. @@ -711,8 +675,9 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { grpc_call_error call_error = grpc_call_start_batch_and_execute( lb_call_, &op, 1, &client_load_report_closure_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), - call_error); + gpr_log(GPR_ERROR, + "[grpclb %p] lb_calld=%p call_error=%d sending client load report", + grpclb_policy_.get(), this, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } @@ -749,7 +714,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); - // Empty payload means the LB call was cancelled. + // Null payload means the LB call was cancelled. if (lb_calld != grpclb_policy->lb_calld_.get() || lb_calld->recv_message_payload_ == nullptr) { lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); @@ -773,15 +738,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( &initial_response->client_stats_report_interval)); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRId64 " milliseconds", - grpclb_policy, lb_calld->client_stats_report_interval_); + "[grpclb %p] lb_calld=%p: Received initial LB response " + "message; client load reporting interval = %" PRId64 + " milliseconds", + grpclb_policy, lb_calld, + lb_calld->client_stats_report_interval_); } } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - grpclb_policy); + "[grpclb %p] lb_calld=%p: Received initial LB response message; " + "client load reporting NOT enabled", + grpclb_policy, lb_calld); } grpc_grpclb_initial_response_destroy(initial_response); lb_calld->seen_initial_response_ = true; @@ -791,74 +758,67 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( GPR_ASSERT(lb_calld->lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - grpclb_policy, serverlist->num_servers); + "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR + " servers received", + grpclb_policy, lb_calld, serverlist->num_servers); for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_resolved_address addr; ParseServer(serverlist->servers[i], &addr); char* ipport; grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - grpclb_policy, i, ipport); + gpr_log(GPR_INFO, + "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s", + grpclb_policy, lb_calld, i, ipport); gpr_free(ipport); } } - /* update serverlist */ - if (serverlist->num_servers > 0) { - // Start sending client load report only after we start using the - // serverlist returned from the current LB call. - if (lb_calld->client_stats_report_interval_ > 0 && - lb_calld->client_stats_ == nullptr) { - lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); - self.release(); - lb_calld->ScheduleNextClientLoadReportLocked(); - } - if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, - serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - grpclb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (grpclb_policy->serverlist_ != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); - grpclb_policy->fallback_backend_addresses_ = nullptr; - if (grpclb_policy->fallback_timer_callback_pending_) { - grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); - } - } - // and update the copy in the GrpcLb instance. This - // serverlist instance will be destroyed either upon the next - // update or when the GrpcLb instance is destroyed. - grpclb_policy->serverlist_ = serverlist; - grpclb_policy->serverlist_index_ = 0; - grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); - } - } else { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval_ > 0 && + lb_calld->client_stats_ == nullptr) { + lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); + self.release(); + lb_calld->ScheduleNextClientLoadReportLocked(); + } + // Check if the serverlist differs from the previous one. + if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", - grpclb_policy); + gpr_log(GPR_INFO, + "[grpclb %p] lb_calld=%p: Incoming server list identical to " + "current, ignoring.", + grpclb_policy, lb_calld); } grpc_grpclb_destroy_serverlist(serverlist); + } else { // New serverlist. + if (grpclb_policy->serverlist_ != nullptr) { + // Dispose of the old serverlist. + grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); + } else { + // Dispose of the fallback. + grpclb_policy->fallback_backend_addresses_.reset(); + if (grpclb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + } + } + // Update the serverlist in the GrpcLb instance. This serverlist + // instance will be destroyed either upon the next update or when the + // GrpcLb instance is destroyed. + grpclb_policy->serverlist_ = serverlist; + grpclb_policy->serverlist_index_ = 0; + grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); } } else { // No valid initial response or serverlist found. char* response_slice_str = grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - grpclb_policy, response_slice_str); + "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. " + "Ignoring.", + grpclb_policy, lb_calld, response_slice_str); gpr_free(response_slice_str); } grpc_slice_unref_internal(response_slice); @@ -889,9 +849,9 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( char* status_details = grpc_slice_to_c_string(lb_calld->lb_call_status_details_); gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, details " - "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", - grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld, + "[grpclb %p] lb_calld=%p: Status from LB server received. " + "Status = %d, details = '%s', (lb_call: %p), error '%s'", + grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details, lb_calld->lb_call_, grpc_error_string(error)); gpr_free(status_details); } @@ -920,31 +880,25 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) { + ServerAddressList balancer_addresses; + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + // Strip out the is_balancer channel arg, since we don't want to + // recursively use the grpclb policy in the channel used to talk to + // the balancers. Note that we do NOT strip out the balancer_name + // channel arg, since we need that to set the authority correctly + // to talk to the balancers. + static const char* args_to_remove[] = { + GRPC_ARG_ADDRESS_IS_BALANCER, + }; + balancer_addresses.emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove, + GPR_ARRAY_SIZE(args_to_remove))); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -956,10 +910,10 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -977,7 +931,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // grpclb LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the grpclb policy, used to propagate updates to // the LB channel. @@ -993,10 +947,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(&balancer_addresses), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -1014,18 +968,14 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); } // // ctor and dtor // -GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1082,9 +1032,6 @@ GrpcLb::~GrpcLb() { if (serverlist_ != nullptr) { grpc_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1132,7 +1079,6 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1194,7 +1140,7 @@ void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1286,9 +1232,27 @@ void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, notify); } +// Returns the backend addresses extracted from the given addresses. +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + grpc_arg arg = grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, + &lb_token_arg_vtable); + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back( + addresses[i].address(), + grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1)); + } + } + return backend_addresses; +} + void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log( GPR_ERROR, @@ -1296,13 +1260,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1313,7 +1272,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1519,12 +1478,17 @@ void DestroyClientStats(void* arg) { } void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ + // If connected_subchannel is nullptr, no pick has been made by the RR + // policy (e.g., all addresses failed to connect). There won't be any + // LB token available. if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), + const grpc_arg* arg = + grpc_channel_args_find(pp->pick->connected_subchannel->args(), + GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + if (arg != nullptr) { + grpc_mdelem lb_token = { + reinterpret_cast<uintptr_t>(arg->value.pointer.p)}; + AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token), &pp->pick->lb_token_mdelem_storage, pp->pick->initial_metadata); } else { @@ -1583,7 +1547,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) { bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { // Check for drops if we are not using fallback backend addresses. - if (serverlist_ != nullptr) { + if (serverlist_ != nullptr && serverlist_->num_servers > 0) { // Look at the index into the serverlist to see if we should drop this call. grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; if (serverlist_index_ == serverlist_->num_servers) { @@ -1608,12 +1572,10 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, return true; } } - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. bool pick_done = rr_policy_->PickLocked(pp->pick, error); if (pick_done) { @@ -1641,6 +1603,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { this); return; } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, + rr_policy_.get()); + } // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); @@ -1678,11 +1644,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { } grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { - grpc_lb_addresses* addresses; + ServerAddressList tmp_addresses; + ServerAddressList* addresses = &tmp_addresses; bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { - GPR_ASSERT(serverlist_->num_servers > 0); - addresses = ProcessServerlist(serverlist_); + tmp_addresses = ProcessServerlist(serverlist_); is_backend_from_grpclb_load_balancer = true; } else { // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't @@ -1691,14 +1657,14 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // empty, in which case the new round_robin policy will keep the requested // picks pending. GPR_ASSERT(fallback_backend_addresses_ != nullptr); - addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); + addresses = fallback_backend_addresses_.get(); } GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; grpc_arg args_to_add[3] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1715,7 +1681,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, num_args_to_add); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1735,10 +1700,6 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { lb_policy_args.client_channel_factory = client_channel_factory(); lb_policy_args.args = args; CreateRoundRobinPolicyLocked(lb_policy_args); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, - rr_policy_.get()); - } } grpc_channel_args_destroy(args); } @@ -1848,22 +1809,21 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args)); + if (!found_balancer) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args)); } - const char* name() const override { return "grpclb"; } + const char* name() const override { return kGrpclb; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 825065a9c3..3b2dc370eb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include <grpc/impl/codegen/grpc_types.h> /// Makes any necessary modifications to \a args for use in the grpclb /// balancer channel. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 441efd5e23..657ff69312 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -26,6 +26,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -42,22 +43,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, } RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( - grpc_lb_addresses* addresses) { + const ServerAddressList& addresses) { TargetAuthorityTable::Entry* target_authority_entries = - static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( - sizeof(*target_authority_entries) * addresses->num_addresses)); - for (size_t i = 0; i < addresses->num_addresses; ++i) { + static_cast<TargetAuthorityTable::Entry*>( + gpr_zalloc(sizeof(*target_authority_entries) * addresses.size())); + for (size_t i = 0; i < addresses.size(); ++i) { char* addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0); target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); - target_authority_entries[i].value.reset( - gpr_strdup(addresses->addresses[i].balancer_name)); gpr_free(addr_str); + char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find( + addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME)); + target_authority_entries[i].value.reset(gpr_strdup(balancer_name)); } RefCountedPtr<TargetAuthorityTable> target_authority_table = - TargetAuthorityTable::Create(addresses->num_addresses, - target_authority_entries, BalancerNameCmp); + TargetAuthorityTable::Create(addresses.size(), target_authority_entries, + BalancerNameCmp); gpr_free(target_authority_entries); return target_authority_table; } @@ -72,13 +74,12 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( grpc_arg args_to_add[2]; size_t num_args_to_add = 0; // Add arg for targets info table. - const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_core::ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(args); + GPR_ASSERT(addresses != nullptr); grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> - target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); + target_authority_table = + grpc_core::CreateTargetAuthorityTable(*addresses); args_to_add[num_args_to_add++] = grpc_core::CreateTargetAuthorityTableChannelArg( target_authority_table.get()); @@ -87,22 +88,18 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( // bearer token credentials. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args); - grpc_channel_credentials* creds_sans_call_creds = nullptr; + grpc_core::RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds; if (channel_credentials != nullptr) { creds_sans_call_creds = - grpc_channel_credentials_duplicate_without_call_credentials( - channel_credentials); + channel_credentials->duplicate_without_call_credentials(); GPR_ASSERT(creds_sans_call_creds != nullptr); args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; args_to_add[num_args_to_add++] = - grpc_channel_credentials_to_arg(creds_sans_call_creds); + grpc_channel_credentials_to_arg(creds_sans_call_creds.get()); } grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); // Clean up. grpc_channel_args_destroy(args); - if (creds_sans_call_creds != nullptr) { - grpc_channel_credentials_unref(creds_sans_call_creds); - } return result; } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 9ca7b28d8e..71d371c880 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -25,7 +25,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index d454401a66..d6ff74ec7f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -24,6 +24,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" @@ -42,10 +43,14 @@ namespace { // pick_first LB policy // +constexpr char kPickFirst[] = "pick_first"; + class PickFirst : public LoadBalancingPolicy { public: explicit PickFirst(const Args& args); + const char* name() const override { return kPickFirst; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -75,11 +80,9 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, - combiner) {} + : SubchannelData(subchannel_list, address, subchannel, combiner) {} void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; @@ -95,7 +98,7 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData> { public: PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) @@ -235,7 +238,7 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -337,8 +340,8 @@ void PickFirst::UpdateChildRefsLocked() { void PickFirst::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { AutoChildRefsUpdater guard(this); - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { if (subchannel_list_ == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( @@ -354,19 +357,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, } return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, - addresses->num_addresses); + addresses->size()); } grpc_arg new_arg = grpc_channel_arg_integer_create( const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); grpc_channel_args* new_args = grpc_channel_args_copy_and_add(&args, &new_arg, 1); auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( - this, &grpc_lb_pick_first_trace, addresses, combiner(), + this, &grpc_lb_pick_first_trace, *addresses, combiner(), client_channel_factory(), *new_args); grpc_channel_args_destroy(new_args); if (subchannel_list->num_subchannels() == 0) { @@ -380,6 +381,31 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, selected_ = nullptr; return; } + // If one of the subchannels in the new list is already in state + // READY, then select it immediately. This can happen when the + // currently selected subchannel is also present in the update. It + // can also happen if one of the subchannels in the update is already + // in the subchannel index because it's in use by another channel. + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = sd->CheckConnectivityStateLocked(&error); + GRPC_ERROR_UNREF(error); + if (state == GRPC_CHANNEL_READY) { + subchannel_list_ = std::move(subchannel_list); + sd->ProcessUnselectedReadyLocked(); + sd->StartConnectivityWatchLocked(); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); + // Make sure that subsequent calls to ExitIdleLocked() don't cause + // us to start watching a subchannel other than the one we've + // selected. + started_picking_ = true; + return; + } + } if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. @@ -387,46 +413,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { - subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } } else { - // We do have a selected subchannel. - // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - PickFirstSubchannelData* sd = subchannel_list->subchannel(i); - if (sd->subchannel() == selected_->subchannel()) { - // The currently selected subchannel is in the update: we are done. - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p " - "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel(), i, - subchannel_list->num_subchannels()); - } - // Make sure it's in state READY. It might not be if we grabbed - // the combiner while a connectivity state notification - // informing us otherwise is pending. - // Note that CheckConnectivityStateLocked() also takes a ref to - // the connected subchannel. - grpc_error* error = GRPC_ERROR_NONE; - if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { - selected_ = sd; - subchannel_list_ = std::move(subchannel_list); - sd->StartConnectivityWatchLocked(); - // If there was a previously pending update (which may or may - // not have contained the currently selected subchannel), drop - // it, so that it doesn't override what we've done here. - latest_pending_subchannel_list_.reset(); - return; - } - GRPC_ERROR_UNREF(error); - } - } - // Not keeping the previous selected subchannel, so set the latest - // pending subchannel list to the new subchannel list. We will wait - // for it to report READY before swapping it into the current - // subchannel list. + // We do have a selected subchannel, so keep using it until one of + // the subchannels in the new list reports READY. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, @@ -440,8 +434,11 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // If we've started picking, start trying to connect to the first // subchannel in the new list. if (started_picking_) { + // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() + // here, since we've already checked the initial connectivity + // state of all subchannels above. latest_pending_subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); + ->StartConnectivityWatchLocked(); } } } @@ -629,7 +626,7 @@ class PickFirstFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args)); } - const char* name() const override { return "pick_first"; } + const char* name() const override { return kPickFirst; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 2a16975131..3bcb33ef11 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -53,10 +53,14 @@ namespace { // round_robin LB policy // +constexpr char kRoundRobin[] = "round_robin"; + class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(const Args& args); + const char* name() const override { return kRoundRobin; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -82,8 +86,6 @@ class RoundRobin : public LoadBalancingPolicy { // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: - // - Tracks user_data associated with each address, which will be - // returned along with picks that select the subchannel. // - Tracks the previous connectivity state of the subchannel, so that // we know how many subchannels are in each state. class RoundRobinSubchannelData @@ -93,26 +95,9 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) - : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, - combiner), - user_data_vtable_(user_data_vtable), - user_data_(user_data_vtable_ != nullptr - ? user_data_vtable_->copy(address.user_data) - : nullptr) {} - - void UnrefSubchannelLocked(const char* reason) override { - SubchannelData::UnrefSubchannelLocked(reason); - if (user_data_ != nullptr) { - GPR_ASSERT(user_data_vtable_ != nullptr); - user_data_vtable_->destroy(user_data_); - user_data_ = nullptr; - } - } - - void* user_data() const { return user_data_; } + : SubchannelData(subchannel_list, address, subchannel, combiner) {} grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; @@ -125,8 +110,6 @@ class RoundRobin : public LoadBalancingPolicy { void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state, grpc_error* error) override; - const grpc_lb_user_data_vtable* user_data_vtable_; - void* user_data_ = nullptr; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; }; @@ -137,7 +120,7 @@ class RoundRobin : public LoadBalancingPolicy { public: RoundRobinSubchannelList( RoundRobin* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, @@ -312,7 +295,7 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, @@ -354,9 +337,6 @@ bool RoundRobin::DoPickLocked(PickState* pick) { subchannel_list_->subchannel(next_ready_index); GPR_ASSERT(sd->connected_subchannel() != nullptr); pick->connected_subchannel = sd->connected_subchannel()->Ref(); - if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data(); - } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " @@ -667,9 +647,9 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); AutoChildRefsUpdater guard(this); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). @@ -681,11 +661,9 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } return; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, addresses->num_addresses); + this, addresses->size()); } // Replace latest_pending_subchannel_list_. if (latest_pending_subchannel_list_ != nullptr) { @@ -696,7 +674,7 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } } latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( - this, &grpc_lb_round_robin_trace, addresses, combiner(), + this, &grpc_lb_round_robin_trace, *addresses, combiner(), client_channel_factory(), args); // If we haven't started picking yet or the new list is empty, // immediately promote the new list to the current list. @@ -726,7 +704,7 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory { return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); } - const char* name() const override { return "round_robin"; } + const char* name() const override { return kRoundRobin; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 4ec9e935ed..6f31a643c1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -26,6 +26,7 @@ #include <grpc/support/alloc.h> #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" @@ -141,8 +142,7 @@ class SubchannelData { protected: SubchannelData( SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner); virtual ~SubchannelData(); @@ -156,9 +156,8 @@ class SubchannelData { grpc_connectivity_state connectivity_state, grpc_error* error) GRPC_ABSTRACT; - // Unrefs the subchannel. May be overridden by subclasses that need - // to perform extra cleanup when unreffing the subchannel. - virtual void UnrefSubchannelLocked(const char* reason); + // Unrefs the subchannel. + void UnrefSubchannelLocked(const char* reason); private: // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. @@ -186,8 +185,7 @@ class SubchannelData { // A list of subchannels. template <typename SubchannelListType, typename SubchannelDataType> -class SubchannelList - : public InternallyRefCountedWithTracing<SubchannelListType> { +class SubchannelList : public InternallyRefCounted<SubchannelListType> { public: typedef InlinedVector<SubchannelDataType, 10> SubchannelVector; @@ -226,15 +224,14 @@ class SubchannelList // Note: Caller must ensure that this is invoked inside of the combiner. void Orphan() override { ShutdownLocked(); - InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION, - "shutdown"); + InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown"); } GRPC_ABSTRACT_BASE_CLASS protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args); @@ -279,8 +276,7 @@ class SubchannelList template <typename SubchannelListType, typename SubchannelDataType> SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list, - const grpc_lb_user_data_vtable* user_data_vtable, - const grpc_lb_address& address, grpc_subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) : subchannel_list_(subchannel_list), subchannel_(subchannel), @@ -490,19 +486,19 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() { template <typename SubchannelListType, typename SubchannelDataType> SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, + const ServerAddressList& addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, const grpc_channel_args& args) - : InternallyRefCountedWithTracing<SubchannelListType>(tracer), + : InternallyRefCounted<SubchannelListType>(tracer), policy_(policy), tracer_(tracer), combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { if (tracer_->enabled()) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer_->name(), policy, this, addresses->num_addresses); + tracer_->name(), policy, this, addresses.size()); } - subchannels_.reserve(addresses->num_addresses); + subchannels_.reserve(addresses.size()); // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. // We also remove the inhibit-health-checking arg, since we are @@ -510,19 +506,27 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( inhibit_health_checking_ = grpc_channel_arg_get_bool( grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, GRPC_ARG_INHIBIT_HEALTH_CHECKING}; // Create a subchannel for each address. grpc_subchannel_args sc_args; - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); + for (size_t i = 0; i < addresses.size(); i++) { + // If there were any balancer addresses, we would have chosen grpclb + // policy, which does not use a SubchannelList. + GPR_ASSERT(!addresses[i].IsBalancer()); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + InlinedVector<grpc_arg, 4> args_to_add; + args_to_add.emplace_back( + grpc_create_subchannel_address_arg(&addresses[i].address())); + if (addresses[i].args() != nullptr) { + for (size_t j = 0; j < addresses[i].args()->num_args; ++j) { + args_to_add.emplace_back(addresses[i].args()->args[j]); + } + } grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( - &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); - gpr_free(addr_arg.value.string); + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), + args_to_add.data(), args_to_add.size()); + gpr_free(args_to_add[0].value.string); sc_args.args = new_args; grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( client_channel_factory, &sc_args); @@ -530,8 +534,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( if (subchannel == nullptr) { // Subchannel could not be created. if (tracer_->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); + char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address()); gpr_log(GPR_INFO, "[%s %p] could not create subchannel for address uri %s, " "ignoring", @@ -541,8 +544,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( continue; } if (tracer_->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); + char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address()); gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR ": Created subchannel %p for address uri %s", @@ -550,8 +552,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(this, addresses->user_data_vtable, - addresses->addresses[i], subchannel, combiner); + subchannels_.emplace_back(this, addresses[i], subchannel, combiner); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 29cd904375..8787f5bcc2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -79,6 +79,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" @@ -114,9 +115,13 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); namespace { +constexpr char kXds[] = "xds_experimental"; + class XdsLb : public LoadBalancingPolicy { public: - XdsLb(const grpc_lb_addresses* addresses, const Args& args); + explicit XdsLb(const Args& args); + + const char* name() const override { return kXds; } void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; @@ -156,9 +161,6 @@ class XdsLb : public LoadBalancingPolicy { // Our on_complete closure and the original one. grpc_closure on_complete; grpc_closure* original_on_complete; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; // Stats for client-side load reporting. RefCountedPtr<XdsLbClientStats> client_stats; // Next pending pick. @@ -166,8 +168,7 @@ class XdsLb : public LoadBalancingPolicy { }; /// Contains a call to the LB server and all the data related to the call. - class BalancerCallState - : public InternallyRefCountedWithTracing<BalancerCallState> { + class BalancerCallState : public InternallyRefCounted<BalancerCallState> { public: explicit BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy); @@ -199,7 +200,6 @@ class XdsLb : public LoadBalancingPolicy { static bool LoadReportCountersAreZero(xds_grpclb_request* request); static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); - static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); static void OnInitialRequestSentLocked(void* arg, grpc_error* error); static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); @@ -258,7 +258,7 @@ class XdsLb : public LoadBalancingPolicy { grpc_error* error); // Pending pick methods. - static void PendingPickSetMetadataAndContext(PendingPick* pp); + static void PendingPickCleanup(PendingPick* pp); PendingPick* PendingPickCreate(PickState* pick); void AddPendingPick(PendingPick* pp); static void OnPendingPickComplete(void* arg, grpc_error* error); @@ -321,7 +321,7 @@ class XdsLb : public LoadBalancingPolicy { // 0 means not using fallback. int lb_fallback_timeout_ms_ = 0; // The backend addresses from the resolver. - grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + UniquePtr<ServerAddressList> fallback_backend_addresses_; // Fallback timer. bool fallback_timer_callback_pending_ = false; grpc_timer lb_fallback_timer_; @@ -341,47 +341,15 @@ class XdsLb : public LoadBalancingPolicy { // serverlist parsing code // -// vtable for LB tokens in grpc_lb_addresses -void* lb_token_copy(void* token) { - return token == nullptr - ? nullptr - : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; -} -void lb_token_destroy(void* token) { - if (token != nullptr) { - GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); - } -} -int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; -} -const grpc_lb_user_data_vtable lb_token_vtable = { - lb_token_copy, lb_token_destroy, lb_token_cmp}; - // Returns the backend addresses extracted from the given addresses. -grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { - // First pass: count the number of backend addresses. - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; +UniquePtr<ServerAddressList> ExtractBackendAddresses( + const ServerAddressList& addresses) { + auto backend_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (!addresses[i].IsBalancer()) { + backend_addresses->emplace_back(addresses[i]); } } - // Second pass: actually populate the addresses and (empty) LB tokens. - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } return backend_addresses; } @@ -431,56 +399,17 @@ void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { } // Returns addresses extracted from \a serverlist. -grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { - size_t num_valid = 0; - /* first pass: count how many are valid in order to allocate the necessary - * memory in a single block */ +UniquePtr<ServerAddressList> ProcessServerlist( + const xds_grpclb_serverlist* serverlist) { + auto addresses = MakeUnique<ServerAddressList>(); for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; - } - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data - * to the outside world) to be read by the child policy during its creation. - * Given that the validity tests are very cheap, they are performed again - * instead of marking the valid ones during the first pass, as this would - * incurr in an allocation due to the arbitrary number of server */ - size_t addr_idx = 0; - for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - const xds_grpclb_server* server = serverlist->servers[sl_idx]; - if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; - GPR_ASSERT(addr_idx < num_valid); - /* address processing */ + const xds_grpclb_server* server = serverlist->servers[i]; + if (!IsServerValid(serverlist->servers[i], i, false)) continue; grpc_resolved_address addr; ParseServer(server, &addr); - /* lb token processing */ - void* user_data; - if (server->has_load_balance_token) { - const size_t lb_token_max_length = - GPR_ARRAY_SIZE(server->load_balance_token); - const size_t lb_token_length = - strnlen(server->load_balance_token, lb_token_max_length); - grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( - server->load_balance_token, lb_token_length); - user_data = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; - } else { - char* uri = grpc_sockaddr_to_uri(&addr); - gpr_log(GPR_INFO, - "Missing LB token for backend address '%s'. The empty token will " - "be used instead", - uri); - gpr_free(uri); - user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; - } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, - false /* is_balancer */, - nullptr /* balancer_name */, user_data); - ++addr_idx; + addresses->emplace_back(addr, nullptr); } - GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; + return addresses; } // @@ -489,7 +418,7 @@ grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { XdsLb::BalancerCallState::BalancerCallState( RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy) - : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_xds_trace), + : InternallyRefCounted<BalancerCallState>(&grpc_lb_xds_trace), xdslb_policy_(std::move(parent_xdslb_policy)) { GPR_ASSERT(xdslb_policy_ != nullptr); GPR_ASSERT(!xdslb_policy()->shutting_down_); @@ -668,6 +597,7 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero( (drop_entries == nullptr || drop_entries->empty()); } +// TODO(vpowar): Use LRS to send the client Load Report. void XdsLb::BalancerCallState::SendClientLoadReportLocked() { // Construct message payload. GPR_ASSERT(send_message_payload_ == nullptr); @@ -685,38 +615,8 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() { } else { last_client_load_report_counters_were_zero_ = false; } - grpc_slice request_payload_slice = xds_grpclb_request_encode(request); - send_message_payload_ = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(request_payload_slice); + // TODO(vpowar): Send the report on LRS stream. xds_grpclb_request_destroy(request); - // Send the report. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = send_message_payload_; - GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, - this, grpc_combiner_scheduler(xdslb_policy()->combiner())); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_call_, &op, 1, &client_load_report_closure_); - if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { - gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(), - call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); - } -} - -void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, - grpc_error* error) { - BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); - XdsLb* xdslb_policy = lb_calld->xdslb_policy(); - grpc_byte_buffer_destroy(lb_calld->send_message_payload_); - lb_calld->send_message_payload_ = nullptr; - if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { - lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); - return; - } - lb_calld->ScheduleNextClientLoadReportLocked(); } void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, @@ -820,8 +720,7 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); } else { /* or dispose of the fallback */ - grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_); - xdslb_policy->fallback_backend_addresses_ = nullptr; + xdslb_policy->fallback_backend_addresses_.reset(); if (xdslb_policy->fallback_timer_callback_pending_) { grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); } @@ -907,31 +806,15 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -grpc_lb_addresses* ExtractBalancerAddresses( - const grpc_lb_addresses* addresses) { - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - // There must be at least one balancer address, or else the - // client_channel would not have chosen this LB policy. - GPR_ASSERT(num_grpclb_addrs > 0); - grpc_lb_addresses* lb_addresses = - grpc_lb_addresses_create(num_grpclb_addrs, nullptr); - size_t lb_addresses_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) continue; - if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); +UniquePtr<ServerAddressList> ExtractBalancerAddresses( + const ServerAddressList& addresses) { + auto balancer_addresses = MakeUnique<ServerAddressList>(); + for (size_t i = 0; i < addresses.size(); ++i) { + if (addresses[i].IsBalancer()) { + balancer_addresses->emplace_back(addresses[i]); } - grpc_lb_addresses_set_address( - lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, - addresses->addresses[i].address.len, false /* is balancer */, - addresses->addresses[i].balancer_name, nullptr /* user data */); } - GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - return lb_addresses; + return balancer_addresses; } /* Returns the channel args for the LB channel, used to create a bidirectional @@ -943,10 +826,11 @@ grpc_lb_addresses* ExtractBalancerAddresses( * above the grpclb policy. * - \a args: other args inherited from the xds policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const grpc_lb_addresses* addresses, + const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { - grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + UniquePtr<ServerAddressList> balancer_addresses = + ExtractBalancerAddresses(addresses); // Channel args to remove. static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in @@ -964,7 +848,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // is_balancer=true. We need the LB channel to return addresses with // is_balancer=false so that it does not wind up recursively using the // xds LB policy, as per the special case logic in client_channel.c. - GRPC_ARG_LB_ADDRESSES, + GRPC_ARG_SERVER_ADDRESS_LIST, // The fake resolver response generator, because we are replacing it // with the one from the xds policy, used to propagate updates to // the LB channel. @@ -980,10 +864,10 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New LB addresses. + // New server address list. // Note that we pass these in both when creating the LB channel // and via the fake resolver. The latter is what actually gets used. - grpc_lb_addresses_create_channel_arg(lb_addresses), + CreateServerAddressListChannelArg(balancer_addresses.get()), // The fake resolver response generator, which we use to inject // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( @@ -1001,10 +885,7 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Make any necessary modifications for security. - new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args); - // Clean up. - grpc_lb_addresses_destroy(lb_addresses); - return new_args; + return grpc_lb_policy_xds_modify_lb_channel_args(new_args); } // @@ -1012,8 +893,7 @@ grpc_channel_args* BuildBalancerChannelArgs( // // TODO(vishalpowar): Use lb_config in args to configure LB policy. -XdsLb::XdsLb(const grpc_lb_addresses* addresses, - const LoadBalancingPolicy::Args& args) +XdsLb::XdsLb(const LoadBalancingPolicy::Args& args) : LoadBalancingPolicy(args), response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), lb_call_backoff_( @@ -1069,9 +949,6 @@ XdsLb::~XdsLb() { if (serverlist_ != nullptr) { xds_grpclb_destroy_serverlist(serverlist_); } - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } grpc_subchannel_index_unref(); } @@ -1119,7 +996,6 @@ void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { while ((pp = pending_picks_) != nullptr) { pending_picks_ = pp->next; pp->pick->on_complete = pp->original_on_complete; - pp->pick->user_data = nullptr; grpc_error* error = GRPC_ERROR_NONE; if (new_policy->PickLocked(pp->pick, &error)) { // Synchronous return; schedule closure. @@ -1181,7 +1057,7 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1272,21 +1148,16 @@ void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, } void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { - const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); - if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); + if (addresses == nullptr) { // Ignore this update. gpr_log(GPR_ERROR, "[xdslb %p] No valid LB addresses channel arg in update, ignoring.", this); return; } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); // Update fallback address list. - if (fallback_backend_addresses_ != nullptr) { - grpc_lb_addresses_destroy(fallback_backend_addresses_); - } - fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + fallback_backend_addresses_ = ExtractBackendAddresses(*addresses); // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; @@ -1297,7 +1168,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { char* uri_str; @@ -1488,37 +1359,15 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, // PendingPick // -// Adds lb_token of selected subchannel (address) to the call's initial -// metadata. -grpc_error* AddLbTokenToInitialMetadata( - grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, - grpc_metadata_batch* initial_metadata) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - // Destroy function used when embedding client stats in call context. void DestroyClientStats(void* arg) { static_cast<XdsLbClientStats*>(arg)->Unref(); } -void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the - * child policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ +void XdsLb::PendingPickCleanup(PendingPick* pp) { + // If connected_subchannel is nullptr, no pick has been made by the + // child policy (e.g., all addresses failed to connect). if (pp->pick->connected_subchannel != nullptr) { - if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { - AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), - &pp->pick->lb_token_mdelem_storage, - pp->pick->initial_metadata); - } else { - gpr_log(GPR_ERROR, - "[xdslb %p] No LB token for connected subchannel pick %p", - pp->xdslb_policy, pp->pick); - abort(); - } // Pass on client stats via context. Passes ownership of the reference. if (pp->client_stats != nullptr) { pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = @@ -1536,7 +1385,7 @@ void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { * order to unref the child policy instance upon its invocation */ void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) { PendingPick* pp = static_cast<PendingPick*>(arg); - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); Delete(pp); } @@ -1568,16 +1417,14 @@ void XdsLb::AddPendingPick(PendingPick* pp) { // completion callback even if the pick is available immediately. bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { - // Set client_stats and user_data. + // Set client_stats. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = (void**)&pp->lb_token; // Pick via the child policy. bool pick_done = child_policy_->PickLocked(pp->pick, error); if (pick_done) { - PendingPickSetMetadataAndContext(pp); + PendingPickCleanup(pp); if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); *error = GRPC_ERROR_NONE; @@ -1639,20 +1486,19 @@ void XdsLb::CreateChildPolicyLocked(const Args& args) { } grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { - grpc_lb_addresses* addresses; bool is_backend_from_grpclb_load_balancer = false; // This should never be invoked if we do not have serverlist_, as fallback // mode is disabled for xDS plugin. GPR_ASSERT(serverlist_ != nullptr); GPR_ASSERT(serverlist_->num_servers > 0); - addresses = ProcessServerlist(serverlist_); - is_backend_from_grpclb_load_balancer = true; + UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_); GPR_ASSERT(addresses != nullptr); - // Replace the LB addresses in the channel args that we pass down to + is_backend_from_grpclb_load_balancer = true; + // Replace the server address list in the channel args that we pass down to // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST}; const grpc_arg args_to_add[] = { - grpc_lb_addresses_create_channel_arg(addresses), + CreateServerAddressListChannelArg(addresses.get()), // A channel arg indicating if the target is a backend inferred from a // grpclb load balancer. grpc_channel_arg_integer_create( @@ -1662,7 +1508,6 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_lb_addresses_destroy(addresses); return args; } @@ -1796,22 +1641,21 @@ class XdsFactory : public LoadBalancingPolicyFactory { OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; - } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + const ServerAddressList* addresses = + FindServerAddressListChannelArg(args.args); + if (addresses == nullptr) return nullptr; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + if ((*addresses)[i].IsBalancer()) { + found_balancer_address = true; + break; + } } - if (num_grpclb_addrs == 0) return nullptr; - return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args)); + if (!found_balancer_address) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args)); } - const char* name() const override { return "xds"; } + const char* name() const override { return kXds; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h index 32c4acc8a3..f713b7f563 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include <grpc/impl/codegen/grpc_types.h> /// Makes any necessary modifications to \a args for use in the xds /// balancer channel. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc index 5ab72efce4..55c646e6ee 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc @@ -25,6 +25,7 @@ #include <string.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -41,22 +42,23 @@ int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, } RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( - grpc_lb_addresses* addresses) { + const ServerAddressList& addresses) { TargetAuthorityTable::Entry* target_authority_entries = - static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( - sizeof(*target_authority_entries) * addresses->num_addresses)); - for (size_t i = 0; i < addresses->num_addresses; ++i) { + static_cast<TargetAuthorityTable::Entry*>( + gpr_zalloc(sizeof(*target_authority_entries) * addresses.size())); + for (size_t i = 0; i < addresses.size(); ++i) { char* addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); + GPR_ASSERT( + grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0); target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); - target_authority_entries[i].value.reset( - gpr_strdup(addresses->addresses[i].balancer_name)); gpr_free(addr_str); + char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find( + addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME)); + target_authority_entries[i].value.reset(gpr_strdup(balancer_name)); } RefCountedPtr<TargetAuthorityTable> target_authority_table = - TargetAuthorityTable::Create(addresses->num_addresses, - target_authority_entries, BalancerNameCmp); + TargetAuthorityTable::Create(addresses.size(), target_authority_entries, + BalancerNameCmp); gpr_free(target_authority_entries); return target_authority_table; } @@ -71,13 +73,12 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( grpc_arg args_to_add[2]; size_t num_args_to_add = 0; // Add arg for targets info table. - const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_core::ServerAddressList* addresses = + grpc_core::FindServerAddressListChannelArg(args); + GPR_ASSERT(addresses != nullptr); grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> - target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); + target_authority_table = + grpc_core::CreateTargetAuthorityTable(*addresses); args_to_add[num_args_to_add++] = grpc_core::CreateTargetAuthorityTableChannelArg( target_authority_table.get()); @@ -86,22 +87,18 @@ grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( // bearer token credentials. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args); - grpc_channel_credentials* creds_sans_call_creds = nullptr; + grpc_core::RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds; if (channel_credentials != nullptr) { creds_sans_call_creds = - grpc_channel_credentials_duplicate_without_call_credentials( - channel_credentials); + channel_credentials->duplicate_without_call_credentials(); GPR_ASSERT(creds_sans_call_creds != nullptr); args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; args_to_add[num_args_to_add++] = - grpc_channel_credentials_to_arg(creds_sans_call_creds); + grpc_channel_credentials_to_arg(creds_sans_call_creds.get()); } grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); // Clean up. grpc_channel_args_destroy(args); - if (creds_sans_call_creds != nullptr) { - grpc_channel_credentials_unref(creds_sans_call_creds); - } return result; } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h index 9d08defa7e..6704995641 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h @@ -25,7 +25,7 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" #define XDS_SERVICE_NAME_MAX_LENGTH 128 diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc deleted file mode 100644 index 5c6363d295..0000000000 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/string_util.h> - -#include "src/core/lib/channel/channel_args.h" - -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/ext/filters/client_channel/parse_address.h" - -grpc_lb_addresses* grpc_lb_addresses_create( - size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(gpr_zalloc(sizeof(grpc_lb_addresses))); - addresses->num_addresses = num_addresses; - addresses->user_data_vtable = user_data_vtable; - const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses; - addresses->addresses = - static_cast<grpc_lb_address*>(gpr_zalloc(addresses_size)); - return addresses; -} - -grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { - grpc_lb_addresses* new_addresses = grpc_lb_addresses_create( - addresses->num_addresses, addresses->user_data_vtable); - memcpy(new_addresses->addresses, addresses->addresses, - sizeof(grpc_lb_address) * addresses->num_addresses); - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (new_addresses->addresses[i].balancer_name != nullptr) { - new_addresses->addresses[i].balancer_name = - gpr_strdup(new_addresses->addresses[i].balancer_name); - } - if (new_addresses->addresses[i].user_data != nullptr) { - new_addresses->addresses[i].user_data = addresses->user_data_vtable->copy( - new_addresses->addresses[i].user_data); - } - } - return new_addresses; -} - -void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, - bool is_balancer, const char* balancer_name, - void* user_data) { - GPR_ASSERT(index < addresses->num_addresses); - if (user_data != nullptr) GPR_ASSERT(addresses->user_data_vtable != nullptr); - grpc_lb_address* target = &addresses->addresses[index]; - memcpy(target->address.addr, address, address_len); - target->address.len = static_cast<socklen_t>(address_len); - target->is_balancer = is_balancer; - target->balancer_name = gpr_strdup(balancer_name); - target->user_data = user_data; -} - -bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses, - size_t index, const grpc_uri* uri, - bool is_balancer, - const char* balancer_name, - void* user_data) { - grpc_resolved_address address; - if (!grpc_parse_uri(uri, &address)) return false; - grpc_lb_addresses_set_address(addresses, index, address.addr, address.len, - is_balancer, balancer_name, user_data); - return true; -} - -int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, - const grpc_lb_addresses* addresses2) { - if (addresses1->num_addresses > addresses2->num_addresses) return 1; - if (addresses1->num_addresses < addresses2->num_addresses) return -1; - if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1; - if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1; - for (size_t i = 0; i < addresses1->num_addresses; ++i) { - const grpc_lb_address* target1 = &addresses1->addresses[i]; - const grpc_lb_address* target2 = &addresses2->addresses[i]; - if (target1->address.len > target2->address.len) return 1; - if (target1->address.len < target2->address.len) return -1; - int retval = memcmp(target1->address.addr, target2->address.addr, - target1->address.len); - if (retval != 0) return retval; - if (target1->is_balancer > target2->is_balancer) return 1; - if (target1->is_balancer < target2->is_balancer) return -1; - const char* balancer_name1 = - target1->balancer_name != nullptr ? target1->balancer_name : ""; - const char* balancer_name2 = - target2->balancer_name != nullptr ? target2->balancer_name : ""; - retval = strcmp(balancer_name1, balancer_name2); - if (retval != 0) return retval; - if (addresses1->user_data_vtable != nullptr) { - retval = addresses1->user_data_vtable->cmp(target1->user_data, - target2->user_data); - if (retval != 0) return retval; - } - } - return 0; -} - -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { - for (size_t i = 0; i < addresses->num_addresses; ++i) { - gpr_free(addresses->addresses[i].balancer_name); - if (addresses->addresses[i].user_data != nullptr) { - addresses->user_data_vtable->destroy(addresses->addresses[i].user_data); - } - } - gpr_free(addresses->addresses); - gpr_free(addresses); -} - -static void* lb_addresses_copy(void* addresses) { - return grpc_lb_addresses_copy(static_cast<grpc_lb_addresses*>(addresses)); -} -static void lb_addresses_destroy(void* addresses) { - grpc_lb_addresses_destroy(static_cast<grpc_lb_addresses*>(addresses)); -} -static int lb_addresses_cmp(void* addresses1, void* addresses2) { - return grpc_lb_addresses_cmp(static_cast<grpc_lb_addresses*>(addresses1), - static_cast<grpc_lb_addresses*>(addresses2)); -} -static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { - lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; - -grpc_arg grpc_lb_addresses_create_channel_arg( - const grpc_lb_addresses* addresses) { - return grpc_channel_arg_pointer_create( - (char*)GRPC_ARG_LB_ADDRESSES, (void*)addresses, &lb_addresses_arg_vtable); -} - -grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( - const grpc_channel_args* channel_args) { - const grpc_arg* lb_addresses_arg = - grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); - if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER) - return nullptr; - return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); -} - -bool grpc_lb_addresses_contains_balancer_address( - const grpc_lb_addresses& addresses) { - for (size_t i = 0; i < addresses.num_addresses; ++i) { - if (addresses.addresses[i].is_balancer) return true; - } - return false; -} diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index a59deadb26..a165ebafab 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -21,91 +21,9 @@ #include <grpc/support/port_platform.h> -#include "src/core/lib/iomgr/resolve_address.h" - -#include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/lb_policy.h" -#include "src/core/lib/uri/uri_parser.h" - -// -// representation of an LB address -// - -// Channel arg key for grpc_lb_addresses. -#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses" - -/** A resolved address alongside any LB related information associated with it. - * \a user_data, if not NULL, contains opaque data meant to be consumed by the - * gRPC LB policy. Note that no all LB policies support \a user_data as input. - * Those who don't will simply ignore it and will correspondingly return NULL in - * their namesake pick() output argument. */ -// TODO(roth): Once we figure out a better way of handling user_data in -// LB policies, convert these structs to C++ classes. -typedef struct grpc_lb_address { - grpc_resolved_address address; - bool is_balancer; - char* balancer_name; /* For secure naming. */ - void* user_data; -} grpc_lb_address; - -typedef struct grpc_lb_user_data_vtable { - void* (*copy)(void*); - void (*destroy)(void*); - int (*cmp)(void*, void*); -} grpc_lb_user_data_vtable; - -typedef struct grpc_lb_addresses { - size_t num_addresses; - grpc_lb_address* addresses; - const grpc_lb_user_data_vtable* user_data_vtable; -} grpc_lb_addresses; - -/** Returns a grpc_addresses struct with enough space for - \a num_addresses addresses. The \a user_data_vtable argument may be - NULL if no user data will be added. */ -grpc_lb_addresses* grpc_lb_addresses_create( - size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable); - -/** Creates a copy of \a addresses. */ -grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses); - -/** Sets the value of the address at index \a index of \a addresses. - * \a address is a socket address of length \a address_len. */ -void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, - bool is_balancer, const char* balancer_name, - void* user_data); - -/** Sets the value of the address at index \a index of \a addresses from \a uri. - * Returns true upon success, false otherwise. */ -bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses, - size_t index, const grpc_uri* uri, - bool is_balancer, - const char* balancer_name, - void* user_data); - -/** Compares \a addresses1 and \a addresses2. */ -int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, - const grpc_lb_addresses* addresses2); - -/** Destroys \a addresses. */ -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses); - -/** Returns a channel arg containing \a addresses. */ -grpc_arg grpc_lb_addresses_create_channel_arg( - const grpc_lb_addresses* addresses); - -/** Returns the \a grpc_lb_addresses instance in \a channel_args or NULL */ -grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( - const grpc_channel_args* channel_args); - -// Returns true if addresses contains at least one balancer address. -bool grpc_lb_addresses_contains_balancer_address( - const grpc_lb_addresses& addresses); - -// -// LB policy factory -// +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/orphanable.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/request_routing.cc b/src/core/ext/filters/client_channel/request_routing.cc new file mode 100644 index 0000000000..f9a7e164e7 --- /dev/null +++ b/src/core/ext/filters/client_channel/request_routing.cc @@ -0,0 +1,936 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/request_routing.h" + +#include <inttypes.h> +#include <limits.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +#include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/http_connect_handshaker.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/deadline/deadline_filter.h" +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/metadata.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/service_config.h" +#include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/status_metadata.h" + +namespace grpc_core { + +// +// RequestRouter::Request::ResolverResultWaiter +// + +// Handles waiting for a resolver result. +// Used only for the first call on an idle channel. +class RequestRouter::Request::ResolverResultWaiter { + public: + explicit ResolverResultWaiter(Request* request) + : request_router_(request->request_router_), + request_(request), + tracer_enabled_(request_router_->tracer_->enabled()) { + if (tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: deferring pick pending resolver " + "result", + request_router_, request); + } + // Add closure to be run when a resolver result is available. + GRPC_CLOSURE_INIT(&done_closure_, &DoneLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + AddToWaitingList(); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, + &cancel_closure_); + } + + private: + // Adds done_closure_ to + // request_router_->waiting_for_resolver_result_closures_. + void AddToWaitingList() { + grpc_closure_list_append( + &request_router_->waiting_for_resolver_result_closures_, &done_closure_, + GRPC_ERROR_NONE); + } + + // Invoked when a resolver result is available. + static void DoneLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + RequestRouter* request_router = self->request_router_; + // If CancelLocked() has already run, delete ourselves without doing + // anything. Note that the call stack may have already been destroyed, + // so it's not safe to access anything in state_. + if (GPR_UNLIKELY(self->finished_)) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p: call cancelled before resolver result", + request_router); + } + Delete(self); + return; + } + // Otherwise, process the resolver result. + Request* request = self->request_; + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver failed to return data", + request_router, request); + } + GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_REF(error)); + } else if (GPR_UNLIKELY(request_router->resolver_ == nullptr)) { + // Shutting down. + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, "request_router=%p request=%p: resolver disconnected", + request_router, request); + } + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (GPR_UNLIKELY(request_router->lb_policy_ == nullptr)) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + if (*request->pick_.initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned but no LB " + "policy; wait_for_ready=true; trying again", + request_router, request); + } + // Re-add ourselves to the waiting list. + self->AddToWaitingList(); + // Return early so that we don't set finished_ to true below. + return; + } else { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned but no LB " + "policy; wait_for_ready=false; failing", + request_router, request); + } + GRPC_CLOSURE_RUN( + request->on_route_done_, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } else { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned, doing LB " + "pick", + request_router, request); + } + request->ProcessServiceConfigAndStartLbPickLocked(); + } + self->finished_ = true; + } + + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + RequestRouter* request_router = self->request_router_; + // If DoneLocked() has already run, delete ourselves without doing anything. + if (self->finished_) { + Delete(self); + return; + } + Request* request = self->request_; + // If we are being cancelled, immediately invoke on_route_done_ + // to propagate the error back to the caller. + if (error != GRPC_ERROR_NONE) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: cancelling call waiting for " + "name resolution", + request_router, request); + } + // Note: Although we are not in the call combiner here, we are + // basically stealing the call combiner from the pending pick, so + // it's safe to run on_route_done_ here -- we are essentially + // calling it here instead of calling it in DoneLocked(). + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); + } + self->finished_ = true; + } + + RequestRouter* request_router_; + Request* request_; + const bool tracer_enabled_; + grpc_closure done_closure_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +// +// RequestRouter::Request::AsyncPickCanceller +// + +// Handles the call combiner cancellation callback for an async LB pick. +class RequestRouter::Request::AsyncPickCanceller { + public: + explicit AsyncPickCanceller(Request* request) + : request_router_(request->request_router_), + request_(request), + tracer_enabled_(request_router_->tracer_->enabled()) { + GRPC_CALL_STACK_REF(request->owning_call_, "pick_callback_cancel"); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, + &cancel_closure_); + } + + void MarkFinishedLocked() { + finished_ = true; + GRPC_CALL_STACK_UNREF(request_->owning_call_, "pick_callback_cancel"); + } + + private: + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + AsyncPickCanceller* self = static_cast<AsyncPickCanceller*>(arg); + Request* request = self->request_; + RequestRouter* request_router = self->request_router_; + if (!self->finished_) { + // Note: request_router->lb_policy_ may have changed since we started our + // pick, in which case we will be cancelling the pick on a policy other + // than the one we started it on. However, this will just be a no-op. + if (error != GRPC_ERROR_NONE && request_router->lb_policy_ != nullptr) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: cancelling pick from LB " + "policy %p", + request_router, request, request_router->lb_policy_.get()); + } + request_router->lb_policy_->CancelPickLocked(&request->pick_, + GRPC_ERROR_REF(error)); + } + request->pick_canceller_ = nullptr; + GRPC_CALL_STACK_UNREF(request->owning_call_, "pick_callback_cancel"); + } + Delete(self); + } + + RequestRouter* request_router_; + Request* request_; + const bool tracer_enabled_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +// +// RequestRouter::Request +// + +RequestRouter::Request::Request(grpc_call_stack* owning_call, + grpc_call_combiner* call_combiner, + grpc_polling_entity* pollent, + grpc_metadata_batch* send_initial_metadata, + uint32_t* send_initial_metadata_flags, + ApplyServiceConfigCallback apply_service_config, + void* apply_service_config_user_data, + grpc_closure* on_route_done) + : owning_call_(owning_call), + call_combiner_(call_combiner), + pollent_(pollent), + apply_service_config_(apply_service_config), + apply_service_config_user_data_(apply_service_config_user_data), + on_route_done_(on_route_done) { + pick_.initial_metadata = send_initial_metadata; + pick_.initial_metadata_flags = send_initial_metadata_flags; +} + +RequestRouter::Request::~Request() { + if (pick_.connected_subchannel != nullptr) { + pick_.connected_subchannel.reset(); + } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (pick_.subchannel_call_context[i].destroy != nullptr) { + pick_.subchannel_call_context[i].destroy( + pick_.subchannel_call_context[i].value); + } + } +} + +// Invoked once resolver results are available. +void RequestRouter::Request::ProcessServiceConfigAndStartLbPickLocked() { + // Get service config data if needed. + if (!apply_service_config_(apply_service_config_user_data_)) return; + // Start LB pick. + StartLbPickLocked(); +} + +void RequestRouter::Request::MaybeAddCallToInterestedPartiesLocked() { + if (!pollent_added_to_interested_parties_) { + pollent_added_to_interested_parties_ = true; + grpc_polling_entity_add_to_pollset_set( + pollent_, request_router_->interested_parties_); + } +} + +void RequestRouter::Request::MaybeRemoveCallFromInterestedPartiesLocked() { + if (pollent_added_to_interested_parties_) { + pollent_added_to_interested_parties_ = false; + grpc_polling_entity_del_from_pollset_set( + pollent_, request_router_->interested_parties_); + } +} + +// Starts a pick on the LB policy. +void RequestRouter::Request::StartLbPickLocked() { + if (request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: starting pick on lb_policy=%p", + request_router_, this, request_router_->lb_policy_.get()); + } + GRPC_CLOSURE_INIT(&on_pick_done_, &LbPickDoneLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + pick_.on_complete = &on_pick_done_; + GRPC_CALL_STACK_REF(owning_call_, "pick_callback"); + grpc_error* error = GRPC_ERROR_NONE; + const bool pick_done = + request_router_->lb_policy_->PickLocked(&pick_, &error); + if (pick_done) { + // Pick completed synchronously. + if (request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: pick completed synchronously", + request_router_, this); + } + GRPC_CLOSURE_RUN(on_route_done_, error); + GRPC_CALL_STACK_UNREF(owning_call_, "pick_callback"); + } else { + // Pick will be returned asynchronously. + // Add the request's polling entity to the request_router's + // interested_parties, so that the I/O of the LB policy can be done + // under it. It will be removed in LbPickDoneLocked(). + MaybeAddCallToInterestedPartiesLocked(); + // Request notification on call cancellation. + // We allocate a separate object to track cancellation, since the + // cancellation closure might still be pending when we need to reuse + // the memory in which this Request object is stored for a subsequent + // retry attempt. + pick_canceller_ = New<AsyncPickCanceller>(this); + } +} + +// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. +// Unrefs the LB policy and invokes on_route_done_. +void RequestRouter::Request::LbPickDoneLocked(void* arg, grpc_error* error) { + Request* self = static_cast<Request*>(arg); + RequestRouter* request_router = self->request_router_; + if (request_router->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: pick completed asynchronously", + request_router, self); + } + self->MaybeRemoveCallFromInterestedPartiesLocked(); + if (self->pick_canceller_ != nullptr) { + self->pick_canceller_->MarkFinishedLocked(); + } + GRPC_CLOSURE_RUN(self->on_route_done_, GRPC_ERROR_REF(error)); + GRPC_CALL_STACK_UNREF(self->owning_call_, "pick_callback"); +} + +// +// RequestRouter::LbConnectivityWatcher +// + +class RequestRouter::LbConnectivityWatcher { + public: + LbConnectivityWatcher(RequestRouter* request_router, + grpc_connectivity_state state, + LoadBalancingPolicy* lb_policy, + grpc_channel_stack* owning_stack, + grpc_combiner* combiner) + : request_router_(request_router), + state_(state), + lb_policy_(lb_policy), + owning_stack_(owning_stack) { + GRPC_CHANNEL_STACK_REF(owning_stack_, "LbConnectivityWatcher"); + GRPC_CLOSURE_INIT(&on_changed_, &OnLbPolicyStateChangedLocked, this, + grpc_combiner_scheduler(combiner)); + lb_policy_->NotifyOnStateChangeLocked(&state_, &on_changed_); + } + + ~LbConnectivityWatcher() { + GRPC_CHANNEL_STACK_UNREF(owning_stack_, "LbConnectivityWatcher"); + } + + private: + static void OnLbPolicyStateChangedLocked(void* arg, grpc_error* error) { + LbConnectivityWatcher* self = static_cast<LbConnectivityWatcher*>(arg); + // If the notification is not for the current policy, we're stale, + // so delete ourselves. + if (self->lb_policy_ != self->request_router_->lb_policy_.get()) { + Delete(self); + return; + } + // Otherwise, process notification. + if (self->request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: lb_policy=%p state changed to %s", + self->request_router_, self->lb_policy_, + grpc_connectivity_state_name(self->state_)); + } + self->request_router_->SetConnectivityStateLocked( + self->state_, GRPC_ERROR_REF(error), "lb_changed"); + // If shutting down, terminate watch. + if (self->state_ == GRPC_CHANNEL_SHUTDOWN) { + Delete(self); + return; + } + // Renew watch. + self->lb_policy_->NotifyOnStateChangeLocked(&self->state_, + &self->on_changed_); + } + + RequestRouter* request_router_; + grpc_connectivity_state state_; + // LB policy address. No ref held, so not safe to dereference unless + // it happens to match request_router->lb_policy_. + LoadBalancingPolicy* lb_policy_; + grpc_channel_stack* owning_stack_; + grpc_closure on_changed_; +}; + +// +// RequestRounter::ReresolutionRequestHandler +// + +class RequestRouter::ReresolutionRequestHandler { + public: + ReresolutionRequestHandler(RequestRouter* request_router, + LoadBalancingPolicy* lb_policy, + grpc_channel_stack* owning_stack, + grpc_combiner* combiner) + : request_router_(request_router), + lb_policy_(lb_policy), + owning_stack_(owning_stack) { + GRPC_CHANNEL_STACK_REF(owning_stack_, "ReresolutionRequestHandler"); + GRPC_CLOSURE_INIT(&closure_, &OnRequestReresolutionLocked, this, + grpc_combiner_scheduler(combiner)); + lb_policy_->SetReresolutionClosureLocked(&closure_); + } + + private: + static void OnRequestReresolutionLocked(void* arg, grpc_error* error) { + ReresolutionRequestHandler* self = + static_cast<ReresolutionRequestHandler*>(arg); + RequestRouter* request_router = self->request_router_; + // If this invocation is for a stale LB policy, treat it as an LB shutdown + // signal. + if (self->lb_policy_ != request_router->lb_policy_.get() || + error != GRPC_ERROR_NONE || request_router->resolver_ == nullptr) { + GRPC_CHANNEL_STACK_UNREF(request_router->owning_stack_, + "ReresolutionRequestHandler"); + Delete(self); + return; + } + if (request_router->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: started name re-resolving", + request_router); + } + request_router->resolver_->RequestReresolutionLocked(); + // Give back the closure to the LB policy. + self->lb_policy_->SetReresolutionClosureLocked(&self->closure_); + } + + RequestRouter* request_router_; + // LB policy address. No ref held, so not safe to dereference unless + // it happens to match request_router->lb_policy_. + LoadBalancingPolicy* lb_policy_; + grpc_channel_stack* owning_stack_; + grpc_closure closure_; +}; + +// +// RequestRouter +// + +RequestRouter::RequestRouter( + grpc_channel_stack* owning_stack, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + grpc_pollset_set* interested_parties, TraceFlag* tracer, + ProcessResolverResultCallback process_resolver_result, + void* process_resolver_result_user_data, const char* target_uri, + const grpc_channel_args* args, grpc_error** error) + : owning_stack_(owning_stack), + combiner_(combiner), + client_channel_factory_(client_channel_factory), + interested_parties_(interested_parties), + tracer_(tracer), + process_resolver_result_(process_resolver_result), + process_resolver_result_user_data_(process_resolver_result_user_data) { + GRPC_CLOSURE_INIT(&on_resolver_result_changed_, + &RequestRouter::OnResolverResultChangedLocked, this, + grpc_combiner_scheduler(combiner)); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "request_router"); + grpc_channel_args* new_args = nullptr; + if (process_resolver_result == nullptr) { + grpc_arg arg = grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0); + new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + } + resolver_ = ResolverRegistry::CreateResolver( + target_uri, (new_args == nullptr ? args : new_args), interested_parties_, + combiner_); + grpc_channel_args_destroy(new_args); + if (resolver_ == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); + } +} + +RequestRouter::~RequestRouter() { + if (resolver_ != nullptr) { + // The only way we can get here is if we never started resolving, + // because we take a ref to the channel stack when we start + // resolving and do not release it until the resolver callback is + // invoked after the resolver shuts down. + resolver_.reset(); + } + if (lb_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + if (client_channel_factory_ != nullptr) { + grpc_client_channel_factory_unref(client_channel_factory_); + } + grpc_connectivity_state_destroy(&state_tracker_); +} + +namespace { + +const char* GetChannelConnectivityStateChangeString( + grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "Channel state change to IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "Channel state change to CONNECTING"; + case GRPC_CHANNEL_READY: + return "Channel state change to READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "Channel state change to TRANSIENT_FAILURE"; + case GRPC_CHANNEL_SHUTDOWN: + return "Channel state change to SHUTDOWN"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +} // namespace + +void RequestRouter::SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, + const char* reason) { + if (lb_policy_ != nullptr) { + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Cancel picks with wait_for_ready=false. + lb_policy_->CancelMatchingPicksLocked( + /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, + /* check= */ 0, GRPC_ERROR_REF(error)); + } else if (state == GRPC_CHANNEL_SHUTDOWN) { + // Cancel all picks. + lb_policy_->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, + GRPC_ERROR_REF(error)); + } + } + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: setting connectivity state to %s", + this, grpc_connectivity_state_name(state)); + } + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + GetChannelConnectivityStateChangeString(state))); + } + grpc_connectivity_state_set(&state_tracker_, state, error, reason); +} + +void RequestRouter::StartResolvingLocked() { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: starting name resolution", this); + } + GPR_ASSERT(!started_resolving_); + started_resolving_ = true; + GRPC_CHANNEL_STACK_REF(owning_stack_, "resolver"); + resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_); +} + +// Invoked from the resolver NextLocked() callback when the resolver +// is shutting down. +void RequestRouter::OnResolverShutdownLocked(grpc_error* error) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down", this); + } + if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + if (resolver_ != nullptr) { + // This should never happen; it can only be triggered by a resolver + // implementation spotaneously deciding to report shutdown without + // being orphaned. This code is included just to be defensive. + if (tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p: spontaneous shutdown from resolver %p", this, + resolver_.get()); + } + resolver_.reset(); + SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Resolver spontaneous shutdown", &error, 1), + "resolver_spontaneous_shutdown"); + } + grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Channel disconnected", &error, 1)); + GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); + GRPC_CHANNEL_STACK_UNREF(owning_stack_, "resolver"); + grpc_channel_args_destroy(resolver_result_); + resolver_result_ = nullptr; + GRPC_ERROR_UNREF(error); +} + +// Creates a new LB policy, replacing any previous one. +// If the new policy is created successfully, sets *connectivity_state and +// *connectivity_error to its initial connectivity state; otherwise, +// leaves them unchanged. +void RequestRouter::CreateNewLbPolicyLocked( + const char* lb_policy_name, grpc_json* lb_config, + grpc_connectivity_state* connectivity_state, + grpc_error** connectivity_error, TraceStringVector* trace_strings) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner_; + lb_policy_args.client_channel_factory = client_channel_factory_; + lb_policy_args.args = resolver_result_; + lb_policy_args.lb_config = lb_config; + OrphanablePtr<LoadBalancingPolicy> new_lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(lb_policy_name, + lb_policy_args); + if (GPR_UNLIKELY(new_lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); + if (channelz_node_ != nullptr) { + char* str; + gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); + trace_strings->push_back(str); + } + } else { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: created new LB policy \"%s\" (%p)", + this, lb_policy_name, new_lb_policy.get()); + } + if (channelz_node_ != nullptr) { + char* str; + gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); + trace_strings->push_back(str); + } + // Swap out the LB policy and update the fds in interested_parties_. + if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_->HandOffPendingPicksLocked(new_lb_policy.get()); + } + lb_policy_ = std::move(new_lb_policy); + grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + // Create re-resolution request handler for the new LB policy. It + // will delete itself when no longer needed. + New<ReresolutionRequestHandler>(this, lb_policy_.get(), owning_stack_, + combiner_); + // Get the new LB policy's initial connectivity state and start a + // connectivity watch. + GRPC_ERROR_UNREF(*connectivity_error); + *connectivity_state = + lb_policy_->CheckConnectivityLocked(connectivity_error); + if (exit_idle_when_lb_policy_arrives_) { + lb_policy_->ExitIdleLocked(); + exit_idle_when_lb_policy_arrives_ = false; + } + // Create new watcher. It will delete itself when done. + New<LbConnectivityWatcher>(this, *connectivity_state, lb_policy_.get(), + owning_stack_, combiner_); + } +} + +void RequestRouter::MaybeAddTraceMessagesForAddressChangesLocked( + TraceStringVector* trace_strings) { + const ServerAddressList* addresses = + FindServerAddressListChannelArg(resolver_result_); + const bool resolution_contains_addresses = + addresses != nullptr && addresses->size() > 0; + if (!resolution_contains_addresses && + previous_resolution_contained_addresses_) { + trace_strings->push_back(gpr_strdup("Address list became empty")); + } else if (resolution_contains_addresses && + !previous_resolution_contained_addresses_) { + trace_strings->push_back(gpr_strdup("Address list became non-empty")); + } + previous_resolution_contained_addresses_ = resolution_contains_addresses; +} + +void RequestRouter::ConcatenateAndAddChannelTraceLocked( + TraceStringVector* trace_strings) const { + if (!trace_strings->empty()) { + gpr_strvec v; + gpr_strvec_init(&v); + gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); + bool is_first = 1; + for (size_t i = 0; i < trace_strings->size(); ++i) { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); + is_first = false; + gpr_strvec_add(&v, (*trace_strings)[i]); + } + char* flat; + size_t flat_len = 0; + flat = gpr_strvec_flatten(&v, &flat_len); + channelz_node_->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_new(flat, flat_len, gpr_free)); + gpr_strvec_destroy(&v); + } +} + +// Callback invoked when a resolver result is available. +void RequestRouter::OnResolverResultChangedLocked(void* arg, + grpc_error* error) { + RequestRouter* self = static_cast<RequestRouter*>(arg); + if (self->tracer_->enabled()) { + const char* disposition = + self->resolver_result_ != nullptr + ? "" + : (error == GRPC_ERROR_NONE ? " (transient error)" + : " (resolver shutdown)"); + gpr_log(GPR_INFO, + "request_router=%p: got resolver result: resolver_result=%p " + "error=%s%s", + self, self->resolver_result_, grpc_error_string(error), + disposition); + } + // Handle shutdown. + if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) { + self->OnResolverShutdownLocked(GRPC_ERROR_REF(error)); + return; + } + // Data used to set the channel's connectivity state. + bool set_connectivity_state = true; + // We only want to trace the address resolution in the follow cases: + // (a) Address resolution resulted in service config change. + // (b) Address resolution that causes number of backends to go from + // zero to non-zero. + // (c) Address resolution that causes number of backends to go from + // non-zero to zero. + // (d) Address resolution that causes a new LB policy to be created. + // + // we track a list of strings to eventually be concatenated and traced. + TraceStringVector trace_strings; + grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_error* connectivity_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); + // resolver_result_ will be null in the case of a transient + // resolution error. In that case, we don't have any new result to + // process, which means that we keep using the previous result (if any). + if (self->resolver_result_ == nullptr) { + if (self->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: resolver transient failure", self); + } + // Don't override connectivity state if we already have an LB policy. + if (self->lb_policy_ != nullptr) set_connectivity_state = false; + } else { + // Parse the resolver result. + const char* lb_policy_name = nullptr; + grpc_json* lb_policy_config = nullptr; + const bool service_config_changed = self->process_resolver_result_( + self->process_resolver_result_user_data_, *self->resolver_result_, + &lb_policy_name, &lb_policy_config); + GPR_ASSERT(lb_policy_name != nullptr); + // Check to see if we're already using the right LB policy. + const bool lb_policy_name_changed = + self->lb_policy_ == nullptr || + strcmp(self->lb_policy_->name(), lb_policy_name) != 0; + if (self->lb_policy_ != nullptr && !lb_policy_name_changed) { + // Continue using the same LB policy. Update with new addresses. + if (self->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p: updating existing LB policy \"%s\" (%p)", + self, lb_policy_name, self->lb_policy_.get()); + } + self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); + // No need to set the channel's connectivity state; the existing + // watch on the LB policy will take care of that. + set_connectivity_state = false; + } else { + // Instantiate new LB policy. + self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config, + &connectivity_state, &connectivity_error, + &trace_strings); + } + // Add channel trace event. + if (self->channelz_node_ != nullptr) { + if (service_config_changed) { + // TODO(ncteisen): might be worth somehow including a snippet of the + // config in the trace, at the risk of bloating the trace logs. + trace_strings.push_back(gpr_strdup("Service config changed")); + } + self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings); + self->ConcatenateAndAddChannelTraceLocked(&trace_strings); + } + // Clean up. + grpc_channel_args_destroy(self->resolver_result_); + self->resolver_result_ = nullptr; + } + // Set the channel's connectivity state if needed. + if (set_connectivity_state) { + self->SetConnectivityStateLocked(connectivity_state, connectivity_error, + "resolver_result"); + } else { + GRPC_ERROR_UNREF(connectivity_error); + } + // Invoke closures that were waiting for results and renew the watch. + GRPC_CLOSURE_LIST_SCHED(&self->waiting_for_resolver_result_closures_); + self->resolver_->NextLocked(&self->resolver_result_, + &self->on_resolver_result_changed_); +} + +void RequestRouter::RouteCallLocked(Request* request) { + GPR_ASSERT(request->pick_.connected_subchannel == nullptr); + request->request_router_ = this; + if (lb_policy_ != nullptr) { + // We already have resolver results, so process the service config + // and start an LB pick. + request->ProcessServiceConfigAndStartLbPickLocked(); + } else if (resolver_ == nullptr) { + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else { + // We do not yet have an LB policy, so wait for a resolver result. + if (!started_resolving_) { + StartResolvingLocked(); + } + // Create a new waiter, which will delete itself when done. + New<Request::ResolverResultWaiter>(request); + // Add the request's polling entity to the request_router's + // interested_parties, so that the I/O of the resolver can be done + // under it. It will be removed in LbPickDoneLocked(). + request->MaybeAddCallToInterestedPartiesLocked(); + } +} + +void RequestRouter::ShutdownLocked(grpc_error* error) { + if (resolver_ != nullptr) { + SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "disconnect"); + resolver_.reset(); + if (!started_resolving_) { + grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, + GRPC_ERROR_REF(error)); + GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); + } + if (lb_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + } + GRPC_ERROR_UNREF(error); +} + +grpc_connectivity_state RequestRouter::GetConnectivityState() { + return grpc_connectivity_state_check(&state_tracker_); +} + +void RequestRouter::NotifyOnConnectivityStateChange( + grpc_connectivity_state* state, grpc_closure* closure) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, state, + closure); +} + +void RequestRouter::ExitIdleLocked() { + if (lb_policy_ != nullptr) { + lb_policy_->ExitIdleLocked(); + } else { + exit_idle_when_lb_policy_arrives_ = true; + if (!started_resolving_ && resolver_ != nullptr) { + StartResolvingLocked(); + } + } +} + +void RequestRouter::ResetConnectionBackoffLocked() { + if (resolver_ != nullptr) { + resolver_->ResetBackoffLocked(); + resolver_->RequestReresolutionLocked(); + } + if (lb_policy_ != nullptr) { + lb_policy_->ResetBackoffLocked(); + } +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/request_routing.h b/src/core/ext/filters/client_channel/request_routing.h new file mode 100644 index 0000000000..0c671229c8 --- /dev/null +++ b/src/core/ext/filters/client_channel/request_routing.h @@ -0,0 +1,177 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/resolver.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/iomgr/call_combiner.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/metadata_batch.h" + +namespace grpc_core { + +class RequestRouter { + public: + class Request { + public: + // Synchronous callback that applies the service config to a call. + // Returns false if the call should be failed. + typedef bool (*ApplyServiceConfigCallback)(void* user_data); + + Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner, + grpc_polling_entity* pollent, + grpc_metadata_batch* send_initial_metadata, + uint32_t* send_initial_metadata_flags, + ApplyServiceConfigCallback apply_service_config, + void* apply_service_config_user_data, grpc_closure* on_route_done); + + ~Request(); + + // TODO(roth): It seems a bit ugly to expose this member in a + // non-const way. Find a better API to avoid this. + LoadBalancingPolicy::PickState* pick() { return &pick_; } + + private: + friend class RequestRouter; + + class ResolverResultWaiter; + class AsyncPickCanceller; + + void ProcessServiceConfigAndStartLbPickLocked(); + void StartLbPickLocked(); + static void LbPickDoneLocked(void* arg, grpc_error* error); + + void MaybeAddCallToInterestedPartiesLocked(); + void MaybeRemoveCallFromInterestedPartiesLocked(); + + // Populated by caller. + grpc_call_stack* owning_call_; + grpc_call_combiner* call_combiner_; + grpc_polling_entity* pollent_; + ApplyServiceConfigCallback apply_service_config_; + void* apply_service_config_user_data_; + grpc_closure* on_route_done_; + LoadBalancingPolicy::PickState pick_; + + // Internal state. + RequestRouter* request_router_ = nullptr; + bool pollent_added_to_interested_parties_ = false; + grpc_closure on_pick_done_; + AsyncPickCanceller* pick_canceller_ = nullptr; + }; + + // Synchronous callback that takes the service config JSON string and + // LB policy name. + // Returns true if the service config has changed since the last result. + typedef bool (*ProcessResolverResultCallback)(void* user_data, + const grpc_channel_args& args, + const char** lb_policy_name, + grpc_json** lb_policy_config); + + RequestRouter(grpc_channel_stack* owning_stack, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + grpc_pollset_set* interested_parties, TraceFlag* tracer, + ProcessResolverResultCallback process_resolver_result, + void* process_resolver_result_user_data, const char* target_uri, + const grpc_channel_args* args, grpc_error** error); + + ~RequestRouter(); + + void set_channelz_node(channelz::ClientChannelNode* channelz_node) { + channelz_node_ = channelz_node; + } + + void RouteCallLocked(Request* request); + + // TODO(roth): Add methods to cancel picks. + + void ShutdownLocked(grpc_error* error); + + void ExitIdleLocked(); + void ResetConnectionBackoffLocked(); + + grpc_connectivity_state GetConnectivityState(); + void NotifyOnConnectivityStateChange(grpc_connectivity_state* state, + grpc_closure* closure); + + LoadBalancingPolicy* lb_policy() const { return lb_policy_.get(); } + + private: + using TraceStringVector = grpc_core::InlinedVector<char*, 3>; + + class ReresolutionRequestHandler; + class LbConnectivityWatcher; + + void StartResolvingLocked(); + void OnResolverShutdownLocked(grpc_error* error); + void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config, + grpc_connectivity_state* connectivity_state, + grpc_error** connectivity_error, + TraceStringVector* trace_strings); + void MaybeAddTraceMessagesForAddressChangesLocked( + TraceStringVector* trace_strings); + void ConcatenateAndAddChannelTraceLocked( + TraceStringVector* trace_strings) const; + static void OnResolverResultChangedLocked(void* arg, grpc_error* error); + + void SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, const char* reason); + + // Passed in from caller at construction time. + grpc_channel_stack* owning_stack_; + grpc_combiner* combiner_; + grpc_client_channel_factory* client_channel_factory_; + grpc_pollset_set* interested_parties_; + TraceFlag* tracer_; + + channelz::ClientChannelNode* channelz_node_ = nullptr; + + // Resolver and associated state. + OrphanablePtr<Resolver> resolver_; + ProcessResolverResultCallback process_resolver_result_; + void* process_resolver_result_user_data_; + bool started_resolving_ = false; + grpc_channel_args* resolver_result_ = nullptr; + bool previous_resolution_contained_addresses_ = false; + grpc_closure_list waiting_for_resolver_result_closures_; + grpc_closure on_resolver_result_changed_; + + // LB policy and associated state. + OrphanablePtr<LoadBalancingPolicy> lb_policy_; + bool exit_idle_when_lb_policy_arrives_ = false; + + grpc_connectivity_state_tracker state_tracker_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H */ diff --git a/src/core/ext/filters/client_channel/resolver.cc b/src/core/ext/filters/client_channel/resolver.cc index cd11eeb9e4..601b08be24 100644 --- a/src/core/ext/filters/client_channel/resolver.cc +++ b/src/core/ext/filters/client_channel/resolver.cc @@ -27,7 +27,7 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount(false, namespace grpc_core { Resolver::Resolver(grpc_combiner* combiner) - : InternallyRefCountedWithTracing(&grpc_trace_resolver_refcount), + : InternallyRefCounted(&grpc_trace_resolver_refcount), combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {} Resolver::~Resolver() { GRPC_COMBINER_UNREF(combiner_, "resolver"); } diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index e9acbb7c41..9da849a101 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -44,7 +44,7 @@ namespace grpc_core { /// /// Note: All methods with a "Locked" suffix must be called from the /// combiner passed to the constructor. -class Resolver : public InternallyRefCountedWithTracing<Resolver> { +class Resolver : public InternallyRefCounted<Resolver> { public: // Not copyable nor movable. Resolver(const Resolver&) = delete; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 90bc88961d..abacd0c960 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -33,6 +33,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" @@ -117,11 +118,13 @@ class AresDnsResolver : public Resolver { /// retry backoff state BackOff backoff_; /// currently resolving addresses - grpc_lb_addresses* lb_addresses_ = nullptr; + UniquePtr<ServerAddressList> addresses_; /// currently resolving service config char* service_config_json_ = nullptr; // has shutdown been initiated bool shutdown_initiated_ = false; + // timeout in milliseconds for active DNS queries + int query_timeout_ms_; }; AresDnsResolver::AresDnsResolver(const ResolverArgs& args) @@ -159,10 +162,15 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) grpc_combiner_scheduler(combiner())); GRPC_CLOSURE_INIT(&on_resolved_, OnResolvedLocked, this, grpc_combiner_scheduler(combiner())); + const grpc_arg* query_timeout_ms_arg = + grpc_channel_args_find(channel_args_, GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS); + query_timeout_ms_ = grpc_channel_arg_get_integer( + query_timeout_ms_arg, + {GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, 0, INT_MAX}); } AresDnsResolver::~AresDnsResolver() { - gpr_log(GPR_DEBUG, "destroying AresDnsResolver"); + GRPC_CARES_TRACE_LOG("resolver:%p destroying AresDnsResolver", this); if (resolved_result_ != nullptr) { grpc_channel_args_destroy(resolved_result_); } @@ -174,7 +182,8 @@ AresDnsResolver::~AresDnsResolver() { void AresDnsResolver::NextLocked(grpc_channel_args** target_result, grpc_closure* on_complete) { - gpr_log(GPR_DEBUG, "AresDnsResolver::NextLocked() is called."); + GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::NextLocked() is called.", + this); GPR_ASSERT(next_completion_ == nullptr); next_completion_ = on_complete; target_result_ = target_result; @@ -217,12 +226,14 @@ void AresDnsResolver::ShutdownLocked() { void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast<AresDnsResolver*>(arg); GRPC_CARES_TRACE_LOG( - "%p re-resolution timer fired. error: %s. shutdown_initiated_: %d", r, - grpc_error_string(error), r->shutdown_initiated_); + "resolver:%p re-resolution timer fired. error: %s. shutdown_initiated_: " + "%d", + r, grpc_error_string(error), r->shutdown_initiated_); r->have_next_resolution_timer_ = false; if (error == GRPC_ERROR_NONE && !r->shutdown_initiated_) { if (!r->resolving_) { - GRPC_CARES_TRACE_LOG("%p start resolving due to re-resolution timer", r); + GRPC_CARES_TRACE_LOG( + "resolver:%p start resolving due to re-resolution timer", r); r->StartResolvingLocked(); } } @@ -307,20 +318,20 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { r->resolving_ = false; gpr_free(r->pending_request_); r->pending_request_ = nullptr; - if (r->lb_addresses_ != nullptr) { + if (r->addresses_ != nullptr) { static const char* args_to_remove[1]; size_t num_args_to_remove = 0; grpc_arg args_to_add[2]; size_t num_args_to_add = 0; args_to_add[num_args_to_add++] = - grpc_lb_addresses_create_channel_arg(r->lb_addresses_); + CreateServerAddressListChannelArg(r->addresses_.get()); char* service_config_string = nullptr; if (r->service_config_json_ != nullptr) { service_config_string = ChooseServiceConfig(r->service_config_json_); gpr_free(r->service_config_json_); if (service_config_string != nullptr) { - gpr_log(GPR_INFO, "selected service config choice: %s", - service_config_string); + GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", + r, service_config_string); args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG; args_to_add[num_args_to_add++] = grpc_channel_arg_string_create( (char*)GRPC_ARG_SERVICE_CONFIG, service_config_string); @@ -330,17 +341,17 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { r->channel_args_, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); gpr_free(service_config_string); - grpc_lb_addresses_destroy(r->lb_addresses_); + r->addresses_.reset(); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); } else if (!r->shutdown_initiated_) { const char* msg = grpc_error_string(error); - gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); + GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r, msg); grpc_millis next_try = r->backoff_.NextAttemptTime(); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); - gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", - grpc_error_string(error)); + GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed (will retry): %s", + r, grpc_error_string(error)); GPR_ASSERT(!r->have_next_resolution_timer_); r->have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the @@ -349,9 +360,10 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { RefCountedPtr<Resolver> self = r->Ref(DEBUG_LOCATION, "retry-timer"); self.release(); if (timeout > 0) { - gpr_log(GPR_DEBUG, "retrying in %" PRId64 " milliseconds", timeout); + GRPC_CARES_TRACE_LOG("resolver:%p retrying in %" PRId64 " milliseconds", + r, timeout); } else { - gpr_log(GPR_DEBUG, "retrying immediately"); + GRPC_CARES_TRACE_LOG("resolver:%p retrying immediately", r); } grpc_timer_init(&r->next_resolution_timer_, next_try, &r->on_next_resolution_); @@ -377,10 +389,10 @@ void AresDnsResolver::MaybeStartResolvingLocked() { if (ms_until_next_resolution > 0) { const grpc_millis last_resolution_ago = grpc_core::ExecCtx::Get()->Now() - last_resolution_timestamp_; - gpr_log(GPR_DEBUG, - "In cooldown from last resolution (from %" PRId64 - " ms ago). Will resolve again in %" PRId64 " ms", - last_resolution_ago, ms_until_next_resolution); + GRPC_CARES_TRACE_LOG( + "resolver:%p In cooldown from last resolution (from %" PRId64 + " ms ago). Will resolve again in %" PRId64 " ms", + this, last_resolution_ago, ms_until_next_resolution); have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer @@ -397,7 +409,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() { } void AresDnsResolver::StartResolvingLocked() { - gpr_log(GPR_DEBUG, "Start resolving."); // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. @@ -405,13 +416,15 @@ void AresDnsResolver::StartResolvingLocked() { self.release(); GPR_ASSERT(!resolving_); resolving_ = true; - lb_addresses_ = nullptr; service_config_json_ = nullptr; pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, - &on_resolved_, &lb_addresses_, true /* check_grpclb */, - request_service_config_ ? &service_config_json_ : nullptr, combiner()); + &on_resolved_, &addresses_, true /* check_grpclb */, + request_service_config_ ? &service_config_json_ : nullptr, + query_timeout_ms_, combiner()); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); + GRPC_CARES_TRACE_LOG("resolver:%p Started resolving. pending_request_:%p", + this, pending_request_); } void AresDnsResolver::MaybeFinishNextLocked() { @@ -419,7 +432,8 @@ void AresDnsResolver::MaybeFinishNextLocked() { *target_result_ = resolved_result_ == nullptr ? nullptr : grpc_channel_args_copy(resolved_result_); - gpr_log(GPR_DEBUG, "AresDnsResolver::MaybeFinishNextLocked()"); + GRPC_CARES_TRACE_LOG("resolver:%p AresDnsResolver::MaybeFinishNextLocked()", + this); GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; published_version_ = resolved_version_; @@ -457,11 +471,16 @@ static grpc_error* blocking_resolve_address_ares( static grpc_address_resolver_vtable ares_resolver = { grpc_resolve_address_ares, blocking_resolve_address_ares}; +static bool should_use_ares(const char* resolver_env) { + return resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0; +} + void grpc_resolver_dns_ares_init() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); /* TODO(zyc): Turn on c-ares based resolver by default after the address sorter and the CNAME support are added. */ - if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + if (should_use_ares(resolver_env)) { + gpr_log(GPR_DEBUG, "Using ares dns resolver"); address_sorting_init(); grpc_error* error = grpc_ares_init(); if (error != GRPC_ERROR_NONE) { @@ -481,7 +500,7 @@ void grpc_resolver_dns_ares_init() { void grpc_resolver_dns_ares_shutdown() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); - if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + if (should_use_ares(resolver_env)) { address_sorting_shutdown(); grpc_ares_cleanup(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index fdbd07ebf5..d99c2e3004 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -31,8 +31,10 @@ #include <grpc/support/time.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" typedef struct fd_node { /** the owner of this fd node */ @@ -76,21 +78,30 @@ struct grpc_ares_ev_driver { grpc_ares_request* request; /** Owned by the ev_driver. Creates new GrpcPolledFd's */ grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory; + /** query timeout in milliseconds */ + int query_timeout_ms; + /** alarm to cancel active queries */ + grpc_timer query_timeout; + /** cancels queries on a timeout */ + grpc_closure on_timeout_locked; }; static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver); static grpc_ares_ev_driver* grpc_ares_ev_driver_ref( grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Ref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p Ref ev_driver %p", ev_driver->request, + ev_driver); gpr_ref(&ev_driver->refs); return ev_driver; } static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { - gpr_log(GPR_DEBUG, "Unref ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p Unref ev_driver %p", ev_driver->request, + ev_driver); if (gpr_unref(&ev_driver->refs)) { - gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); + GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request, + ev_driver); GPR_ASSERT(ev_driver->fds == nullptr); GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver"); ares_destroy(ev_driver->channel); @@ -100,7 +111,8 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) { } static void fd_node_destroy_locked(fd_node* fdn) { - gpr_log(GPR_DEBUG, "delete fd: %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p delete fd: %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); @@ -116,8 +128,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } } +static void on_timeout_locked(void* arg, grpc_error* error); + grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, grpc_combiner* combiner, grpc_ares_request* request) { *ev_driver = grpc_core::New<grpc_ares_ev_driver>(); @@ -125,7 +140,7 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, memset(&opts, 0, sizeof(opts)); opts.flags |= ARES_FLAG_STAYOPEN; int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); - gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked"); + GRPC_CARES_TRACE_LOG("request:%p grpc_ares_ev_driver_create_locked", request); if (status != ARES_SUCCESS) { char* err_msg; gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", @@ -146,6 +161,9 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner); (*ev_driver) ->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel); + GRPC_CLOSURE_INIT(&(*ev_driver)->on_timeout_locked, on_timeout_locked, + *ev_driver, grpc_combiner_scheduler(combiner)); + (*ev_driver)->query_timeout_ms = query_timeout_ms; return GRPC_ERROR_NONE; } @@ -155,6 +173,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked( // is working, grpc_ares_notify_on_event_locked will shut down the // fds; if it's not working, there are no fds to shut down. ev_driver->shutting_down = true; + grpc_timer_cancel(&ev_driver->query_timeout); grpc_ares_ev_driver_unref(ev_driver); } @@ -185,12 +204,25 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) { return nullptr; } +static void on_timeout_locked(void* arg, grpc_error* error) { + grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg); + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " + "err=%s", + driver->request, driver, driver->shutting_down, grpc_error_string(error)); + if (!driver->shutting_down && error == GRPC_ERROR_NONE) { + grpc_ares_ev_driver_shutdown_locked(driver); + } + grpc_ares_ev_driver_unref(driver); +} + static void on_readable_locked(void* arg, grpc_error* error) { fd_node* fdn = static_cast<fd_node*>(arg); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->readable_registered = false; - gpr_log(GPR_DEBUG, "readable on %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request, + fdn->grpc_polled_fd->GetName()); if (error == GRPC_ERROR_NONE) { do { ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD); @@ -213,7 +245,8 @@ static void on_writable_locked(void* arg, grpc_error* error) { grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); fdn->writable_registered = false; - gpr_log(GPR_DEBUG, "writable on %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as); } else { @@ -252,7 +285,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { fdn->grpc_polled_fd = ev_driver->polled_fd_factory->NewGrpcPolledFdLocked( socks[i], ev_driver->pollset_set, ev_driver->combiner); - gpr_log(GPR_DEBUG, "new fd: %s", fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p new fd: %s", ev_driver->request, + fdn->grpc_polled_fd->GetName()); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; @@ -269,8 +303,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %s", - fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p notify read on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); fdn->grpc_polled_fd->RegisterForOnReadableLocked(&fdn->read_closure); fdn->readable_registered = true; } @@ -278,8 +313,9 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { // has not been registered with this socket. if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { - gpr_log(GPR_DEBUG, "notify write on: %s", - fdn->grpc_polled_fd->GetName()); + GRPC_CARES_TRACE_LOG("request:%p notify write on: %s", + ev_driver->request, + fdn->grpc_polled_fd->GetName()); grpc_ares_ev_driver_ref(ev_driver); fdn->grpc_polled_fd->RegisterForOnWriteableLocked( &fdn->write_closure); @@ -306,7 +342,8 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { // If the ev driver has no working fd, all the tasks are done. if (new_list == nullptr) { ev_driver->working = false; - gpr_log(GPR_DEBUG, "ev driver stop working"); + GRPC_CARES_TRACE_LOG("request:%p ev driver stop working", + ev_driver->request); } } @@ -314,6 +351,17 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) { if (!ev_driver->working) { ev_driver->working = true; grpc_ares_notify_on_event_locked(ev_driver); + grpc_millis timeout = + ev_driver->query_timeout_ms == 0 + ? GRPC_MILLIS_INF_FUTURE + : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + GRPC_CARES_TRACE_LOG( + "request:%p ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in " + "%" PRId64 " ms", + ev_driver->request, ev_driver, timeout); + grpc_ares_ev_driver_ref(ev_driver); + grpc_timer_init(&ev_driver->query_timeout, timeout, + &ev_driver->on_timeout_locked); } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 671c537fe7..b8cefd9470 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -43,6 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked( created successfully. */ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, + int query_timeout_ms, grpc_combiner* combiner, grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 582e2203fc..1a7e5d0626 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -37,12 +37,16 @@ #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +using grpc_core::ServerAddress; +using grpc_core::ServerAddressList; + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; @@ -58,7 +62,7 @@ struct grpc_ares_request { /** closure to call when the request completes */ grpc_closure* on_done; /** the pointer to receive the resolved addresses */ - grpc_lb_addresses** lb_addrs_out; + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses_out; /** the pointer to receive the service config in JSON */ char** service_config_json_out; /** the evernt driver used by this request */ @@ -87,46 +91,44 @@ typedef struct grpc_ares_hostbyname_request { static void do_basic_init(void) { gpr_mu_init(&g_init_mu); } -static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, +static void log_address_sorting_list(const ServerAddressList& addresses, const char* input_output_str) { - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + for (size_t i = 0; i < addresses.size(); i++) { char* addr_str; - if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address, - true)) { - gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s", + if (grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true)) { + gpr_log(GPR_INFO, "c-ares address sorting: %s[%" PRIuPTR "]=%s", input_output_str, i, addr_str); gpr_free(addr_str); } else { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "c-ares address sorting: %s[%" PRIuPTR "]=<unprintable>", input_output_str, i); } } } -void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { +void grpc_cares_wrapper_address_sorting_sort(ServerAddressList* addresses) { if (grpc_trace_cares_address_sorting.enabled()) { - log_address_sorting_list(lb_addrs, "input"); + log_address_sorting_list(*addresses, "input"); } address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc( - sizeof(address_sorting_sortable) * lb_addrs->num_addresses); - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { - sortables[i].user_data = &lb_addrs->addresses[i]; - memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr, - lb_addrs->addresses[i].address.len); - sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len; + sizeof(address_sorting_sortable) * addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sortables[i].user_data = &(*addresses)[i]; + memcpy(&sortables[i].dest_addr.addr, &(*addresses)[i].address().addr, + (*addresses)[i].address().len); + sortables[i].dest_addr.len = (*addresses)[i].address().len; } - address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses); - grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc( - sizeof(grpc_lb_address) * lb_addrs->num_addresses); - for (size_t i = 0; i < lb_addrs->num_addresses; i++) { - sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data; + address_sorting_rfc_6724_sort(sortables, addresses->size()); + ServerAddressList sorted; + sorted.reserve(addresses->size()); + for (size_t i = 0; i < addresses->size(); ++i) { + sorted.emplace_back(*static_cast<ServerAddress*>(sortables[i].user_data)); } gpr_free(sortables); - gpr_free(lb_addrs->addresses); - lb_addrs->addresses = sorted_lb_addrs; + *addresses = std::move(sorted); if (grpc_trace_cares_address_sorting.enabled()) { - log_address_sorting_list(lb_addrs, "output"); + log_address_sorting_list(*addresses, "output"); } } @@ -145,9 +147,9 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) { /* Invoke on_done callback and destroy the request */ r->ev_driver = nullptr; - grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out); - if (lb_addrs != nullptr) { - grpc_cares_wrapper_address_sorting_sort(lb_addrs); + ServerAddressList* addresses = r->addresses_out->get(); + if (addresses != nullptr) { + grpc_cares_wrapper_address_sorting_sort(addresses); } GRPC_CLOSURE_SCHED(r->on_done, r->error); } @@ -181,60 +183,53 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, GRPC_ERROR_UNREF(r->error); r->error = GRPC_ERROR_NONE; r->success = true; - grpc_lb_addresses** lb_addresses = r->lb_addrs_out; - if (*lb_addresses == nullptr) { - *lb_addresses = grpc_lb_addresses_create(0, nullptr); - } - size_t prev_naddr = (*lb_addresses)->num_addresses; - size_t i; - for (i = 0; hostent->h_addr_list[i] != nullptr; i++) { + if (*r->addresses_out == nullptr) { + *r->addresses_out = grpc_core::MakeUnique<ServerAddressList>(); } - (*lb_addresses)->num_addresses += i; - (*lb_addresses)->addresses = static_cast<grpc_lb_address*>( - gpr_realloc((*lb_addresses)->addresses, - sizeof(grpc_lb_address) * (*lb_addresses)->num_addresses)); - for (i = prev_naddr; i < (*lb_addresses)->num_addresses; i++) { + ServerAddressList& addresses = **r->addresses_out; + for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) { + grpc_core::InlinedVector<grpc_arg, 2> args_to_add; + if (hr->is_balancer) { + args_to_add.emplace_back(grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1)); + args_to_add.emplace_back(grpc_channel_arg_string_create( + const_cast<char*>(GRPC_ARG_ADDRESS_BALANCER_NAME), hr->host)); + } + grpc_channel_args* args = grpc_channel_args_copy_and_add( + nullptr, args_to_add.data(), args_to_add.size()); switch (hostent->h_addrtype) { case AF_INET6: { size_t addr_len = sizeof(struct sockaddr_in6); struct sockaddr_in6 addr; memset(&addr, 0, addr_len); - memcpy(&addr.sin6_addr, hostent->h_addr_list[i - prev_naddr], + memcpy(&addr.sin6_addr, hostent->h_addr_list[i], sizeof(struct in6_addr)); addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin6_port = hr->port; - grpc_lb_addresses_set_address( - *lb_addresses, i, &addr, addr_len, - hr->is_balancer /* is_balancer */, - hr->is_balancer ? hr->host : nullptr /* balancer_name */, - nullptr /* user_data */); + addresses.emplace_back(&addr, addr_len, args); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); - gpr_log(GPR_DEBUG, - "c-ares resolver gets a AF_INET6 result: \n" - " addr: %s\n port: %d\n sin6_scope_id: %d\n", - output, ntohs(hr->port), addr.sin6_scope_id); + GRPC_CARES_TRACE_LOG( + "request:%p c-ares resolver gets a AF_INET6 result: \n" + " addr: %s\n port: %d\n sin6_scope_id: %d\n", + r, output, ntohs(hr->port), addr.sin6_scope_id); break; } case AF_INET: { size_t addr_len = sizeof(struct sockaddr_in); struct sockaddr_in addr; memset(&addr, 0, addr_len); - memcpy(&addr.sin_addr, hostent->h_addr_list[i - prev_naddr], + memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr)); addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype); addr.sin_port = hr->port; - grpc_lb_addresses_set_address( - *lb_addresses, i, &addr, addr_len, - hr->is_balancer /* is_balancer */, - hr->is_balancer ? hr->host : nullptr /* balancer_name */, - nullptr /* user_data */); + addresses.emplace_back(&addr, addr_len, args); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); - gpr_log(GPR_DEBUG, - "c-ares resolver gets a AF_INET result: \n" - " addr: %s\n port: %d\n", - output, ntohs(hr->port)); + GRPC_CARES_TRACE_LOG( + "request:%p c-ares resolver gets a AF_INET result: \n" + " addr: %s\n port: %d\n", + r, output, ntohs(hr->port)); break; } } @@ -257,9 +252,9 @@ static void on_hostbyname_done_locked(void* arg, int status, int timeouts, static void on_srv_query_done_locked(void* arg, int status, int timeouts, unsigned char* abuf, int alen) { grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); - gpr_log(GPR_DEBUG, "on_query_srv_done_locked"); + GRPC_CARES_TRACE_LOG("request:%p on_query_srv_done_locked", r); if (status == ARES_SUCCESS) { - gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS"); + GRPC_CARES_TRACE_LOG("request:%p on_query_srv_done_locked ARES_SUCCESS", r); struct ares_srv_reply* reply; const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); if (parse_status == ARES_SUCCESS) { @@ -302,9 +297,9 @@ static const char g_service_config_attribute_prefix[] = "grpc_config="; static void on_txt_done_locked(void* arg, int status, int timeouts, unsigned char* buf, int len) { - gpr_log(GPR_DEBUG, "on_txt_done_locked"); char* error_msg; grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); + GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked", r); const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; struct ares_txt_ext* result = nullptr; struct ares_txt_ext* reply = nullptr; @@ -337,7 +332,8 @@ static void on_txt_done_locked(void* arg, int status, int timeouts, service_config_len += result->length; } (*r->service_config_json_out)[service_config_len] = '\0'; - gpr_log(GPR_INFO, "found service config: %s", *r->service_config_json_out); + GRPC_CARES_TRACE_LOG("request:%p found service config: %s", r, + *r->service_config_json_out); } // Clean up. ares_free_data(reply); @@ -359,16 +355,10 @@ done: void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, - bool check_grpclb, grpc_combiner* combiner) { + bool check_grpclb, int query_timeout_ms, grpc_combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; ares_channel* channel = nullptr; - /* TODO(zyc): Enable tracing after #9603 is checked in */ - /* if (grpc_dns_trace) { - gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s", - name, default_port); - } */ - /* parse name, splitting it into host and port parts */ char* host; char* port; @@ -388,12 +378,12 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( port = gpr_strdup(default_port); } error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties, - combiner, r); + query_timeout_ms, combiner, r); if (error != GRPC_ERROR_NONE) goto error_cleanup; channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver); // If dns_server is specified, use it. if (dns_server != nullptr) { - gpr_log(GPR_INFO, "Using DNS server %s", dns_server); + GRPC_CARES_TRACE_LOG("request:%p Using DNS server %s", r, dns_server); grpc_resolved_address addr; if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) { r->dns_server_addr.family = AF_INET; @@ -467,11 +457,10 @@ error_cleanup: gpr_free(port); } -static bool inner_resolve_as_ip_literal_locked(const char* name, - const char* default_port, - grpc_lb_addresses** addrs, - char** host, char** port, - char** hostport) { +static bool inner_resolve_as_ip_literal_locked( + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host, + char** port, char** hostport) { gpr_split_host_port(name, host, port); if (*host == nullptr) { gpr_log(GPR_ERROR, @@ -495,18 +484,16 @@ static bool inner_resolve_as_ip_literal_locked(const char* name, if (grpc_parse_ipv4_hostport(*hostport, &addr, false /* log errors */) || grpc_parse_ipv6_hostport(*hostport, &addr, false /* log errors */)) { GPR_ASSERT(*addrs == nullptr); - *addrs = grpc_lb_addresses_create(1, nullptr); - grpc_lb_addresses_set_address( - *addrs, 0, addr.addr, addr.len, false /* is_balancer */, - nullptr /* balancer_name */, nullptr /* user_data */); + *addrs = grpc_core::MakeUnique<ServerAddressList>(); + (*addrs)->emplace_back(addr.addr, addr.len, nullptr /* args */); return true; } return false; } -static bool resolve_as_ip_literal_locked(const char* name, - const char* default_port, - grpc_lb_addresses** addrs) { +static bool resolve_as_ip_literal_locked( + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { char* host = nullptr; char* port = nullptr; char* hostport = nullptr; @@ -518,20 +505,47 @@ static bool resolve_as_ip_literal_locked(const char* name, return out; } +static bool target_matches_localhost_inner(const char* name, char** host, + char** port) { + if (!gpr_split_host_port(name, host, port)) { + gpr_log(GPR_ERROR, "Unable to split host and port for name: %s", name); + return false; + } + if (gpr_stricmp(*host, "localhost") == 0) { + return true; + } else { + return false; + } +} + +static bool target_matches_localhost(const char* name) { + char* host = nullptr; + char* port = nullptr; + bool out = target_matches_localhost_inner(name, &host, &port); + gpr_free(host); + gpr_free(port); + return out; +} + static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) { grpc_ares_request* r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r->ev_driver = nullptr; r->on_done = on_done; - r->lb_addrs_out = addrs; + r->addresses_out = addrs; r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; r->pending_queries = 0; + GRPC_CARES_TRACE_LOG( + "request:%p c-ares grpc_dns_lookup_ares_locked_impl name=%s, " + "default_port=%s", + r, name, default_port); // Early out if the target is an ipv4 or ipv6 literal. if (resolve_as_ip_literal_locked(name, default_port, addrs)) { GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); @@ -543,17 +557,25 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE); return r; } + // Don't query for SRV and TXT records if the target is "localhost", so + // as to cut down on lookups over the network, especially in tests: + // https://github.com/grpc/proposal/pull/79 + if (target_matches_localhost(name)) { + check_grpclb = false; + r->service_config_json_out = nullptr; + } // Look up name using c-ares lib. grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( r, dns_server, name, default_port, interested_parties, check_grpclb, - combiner); + query_timeout_ms, combiner); return r; } grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { @@ -598,8 +620,8 @@ typedef struct grpc_resolve_address_ares_request { grpc_combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; - /** currently resolving lb addresses */ - grpc_lb_addresses* lb_addrs; + /** currently resolving addresses */ + grpc_core::UniquePtr<ServerAddressList> addresses; /** closure to call when the resolve_address_ares request completes */ grpc_closure* on_resolve_address_done; /** a closure wrapping on_resolve_address_done, which should be invoked when @@ -612,7 +634,7 @@ typedef struct grpc_resolve_address_ares_request { /* pollset_set to be driven by */ grpc_pollset_set* interested_parties; /* underlying ares_request that the query is performed on */ - grpc_ares_request* ares_request; + grpc_ares_request* ares_request = nullptr; } grpc_resolve_address_ares_request; static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { @@ -620,25 +642,24 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { static_cast<grpc_resolve_address_ares_request*>(arg); gpr_free(r->ares_request); grpc_resolved_addresses** resolved_addresses = r->addrs_out; - if (r->lb_addrs == nullptr || r->lb_addrs->num_addresses == 0) { + if (r->addresses == nullptr || r->addresses->empty()) { *resolved_addresses = nullptr; } else { *resolved_addresses = static_cast<grpc_resolved_addresses*>( gpr_zalloc(sizeof(grpc_resolved_addresses))); - (*resolved_addresses)->naddrs = r->lb_addrs->num_addresses; + (*resolved_addresses)->naddrs = r->addresses->size(); (*resolved_addresses)->addrs = static_cast<grpc_resolved_address*>(gpr_zalloc( sizeof(grpc_resolved_address) * (*resolved_addresses)->naddrs)); - for (size_t i = 0; i < (*resolved_addresses)->naddrs; i++) { - GPR_ASSERT(!r->lb_addrs->addresses[i].is_balancer); - memcpy(&(*resolved_addresses)->addrs[i], - &r->lb_addrs->addresses[i].address, sizeof(grpc_resolved_address)); + for (size_t i = 0; i < (*resolved_addresses)->naddrs; ++i) { + GPR_ASSERT(!(*r->addresses)[i].IsBalancer()); + memcpy(&(*resolved_addresses)->addrs[i], &(*r->addresses)[i].address(), + sizeof(grpc_resolved_address)); } } GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); - if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); - gpr_free(r); + grpc_core::Delete(r); } static void grpc_resolve_address_invoke_dns_lookup_ares_locked( @@ -647,8 +668,9 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked( static_cast<grpc_resolve_address_ares_request*>(arg); r->ares_request = grpc_dns_lookup_ares_locked( nullptr /* dns_server */, r->name, r->default_port, r->interested_parties, - &r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */, - nullptr /* service_config_json */, r->combiner); + &r->on_dns_lookup_done_locked, &r->addresses, false /* check_grpclb */, + nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, + r->combiner); } static void grpc_resolve_address_ares_impl(const char* name, @@ -657,8 +679,7 @@ static void grpc_resolve_address_ares_impl(const char* name, grpc_closure* on_done, grpc_resolved_addresses** addrs) { grpc_resolve_address_ares_request* r = - static_cast<grpc_resolve_address_ares_request*>( - gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); + grpc_core::New<grpc_resolve_address_ares_request>(); r->combiner = grpc_combiner_create(); r->addrs_out = addrs; r->on_resolve_address_done = on_done; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index a1231cc4e0..2808250456 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -21,11 +21,13 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 10000 + extern grpc_core::TraceFlag grpc_trace_cares_address_sorting; extern grpc_core::TraceFlag grpc_trace_cares_resolver; @@ -59,8 +61,9 @@ extern void (*grpc_resolve_address_ares)(const char* name, extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addresses, bool check_grpclb, - char** service_config_json, grpc_combiner* combiner); + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, + bool check_grpclb, char** service_config_json, int query_timeout_ms, + grpc_combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); @@ -87,10 +90,12 @@ bool grpc_ares_query_ipv6(); * Returns a bool indicating whether or not such an action was performed. * See https://github.com/grpc/grpc/issues/15158. */ bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs); + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs); /* Sorts destinations in lb_addrs according to RFC 6724. */ -void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs); +void grpc_cares_wrapper_address_sorting_sort( + grpc_core::ServerAddressList* addresses); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index 9f293c1ac0..1f4701c999 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -29,7 +29,8 @@ struct grpc_ares_request { static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) { return NULL; } @@ -37,7 +38,8 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_ares_request* (*grpc_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, - grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, + bool check_grpclb, char** service_config_json, int query_timeout_ms, grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {} diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc index 639eec2323..028d844216 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_posix.cc @@ -27,7 +27,8 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { return false; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc index 7e34784691..202452f1b2 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_windows.cc @@ -23,9 +23,9 @@ #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/socket_windows.h" @@ -33,8 +33,9 @@ bool grpc_ares_query_ipv6() { return grpc_ipv6_loopback_available(); } static bool inner_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs, - char** host, char** port) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs, char** host, + char** port) { gpr_split_host_port(name, host, port); if (*host == nullptr) { gpr_log(GPR_ERROR, @@ -55,7 +56,7 @@ static bool inner_maybe_resolve_localhost_manually_locked( } if (gpr_stricmp(*host, "localhost") == 0) { GPR_ASSERT(*addrs == nullptr); - *addrs = grpc_lb_addresses_create(2, nullptr); + *addrs = grpc_core::MakeUnique<grpc_core::ServerAddressList>(); uint16_t numeric_port = grpc_strhtons(*port); // Append the ipv6 loopback address. struct sockaddr_in6 ipv6_loopback_addr; @@ -63,10 +64,8 @@ static bool inner_maybe_resolve_localhost_manually_locked( ((char*)&ipv6_loopback_addr.sin6_addr)[15] = 1; ipv6_loopback_addr.sin6_family = AF_INET6; ipv6_loopback_addr.sin6_port = numeric_port; - grpc_lb_addresses_set_address( - *addrs, 0, &ipv6_loopback_addr, sizeof(ipv6_loopback_addr), - false /* is_balancer */, nullptr /* balancer_name */, - nullptr /* user_data */); + (*addrs)->emplace_back(&ipv6_loopback_addr, sizeof(ipv6_loopback_addr), + nullptr /* args */); // Append the ipv4 loopback address. struct sockaddr_in ipv4_loopback_addr; memset(&ipv4_loopback_addr, 0, sizeof(ipv4_loopback_addr)); @@ -74,19 +73,18 @@ static bool inner_maybe_resolve_localhost_manually_locked( ((char*)&ipv4_loopback_addr.sin_addr)[3] = 0x01; ipv4_loopback_addr.sin_family = AF_INET; ipv4_loopback_addr.sin_port = numeric_port; - grpc_lb_addresses_set_address( - *addrs, 1, &ipv4_loopback_addr, sizeof(ipv4_loopback_addr), - false /* is_balancer */, nullptr /* balancer_name */, - nullptr /* user_data */); + (*addrs)->emplace_back(&ipv4_loopback_addr, sizeof(ipv4_loopback_addr), + nullptr /* args */); // Let the address sorter figure out which one should be tried first. - grpc_cares_wrapper_address_sorting_sort(*addrs); + grpc_cares_wrapper_address_sorting_sort(addrs->get()); return true; } return false; } bool grpc_ares_maybe_resolve_localhost_manually_locked( - const char* name, const char* default_port, grpc_lb_addresses** addrs) { + const char* name, const char* default_port, + grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs) { char* host = nullptr; char* port = nullptr; bool out = inner_maybe_resolve_localhost_manually_locked(name, default_port, diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 65ff1ec1a5..c365f1abfd 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -26,8 +26,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> -#include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/env.h" @@ -198,18 +198,14 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(r->name_to_resolve_)); if (r->addresses_ != nullptr) { - grpc_lb_addresses* addresses = grpc_lb_addresses_create( - r->addresses_->naddrs, nullptr /* user_data_vtable */); + ServerAddressList addresses; for (size_t i = 0; i < r->addresses_->naddrs; ++i) { - grpc_lb_addresses_set_address( - addresses, i, &r->addresses_->addrs[i].addr, - r->addresses_->addrs[i].len, false /* is_balancer */, - nullptr /* balancer_name */, nullptr /* user_data */); + addresses.emplace_back(&r->addresses_->addrs[i].addr, + r->addresses_->addrs[i].len, nullptr /* args */); } - grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); + grpc_arg new_arg = CreateServerAddressListChannelArg(&addresses); result = grpc_channel_args_copy_and_add(r->channel_args_, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses_); - grpc_lb_addresses_destroy(addresses); // Reset backoff state so that we start from the beginning when the // next request gets triggered. r->backoff_.Reset(); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 3aa690bea4..258339491c 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -28,12 +28,13 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 7f69059351..d86111c382 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -19,10 +19,9 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/uri/uri_parser.h" +#include "src/core/lib/iomgr/error.h" #define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \ "grpc.fake_resolver.response_generator" diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc index 801734764b..1654747a79 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc @@ -26,9 +26,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" @@ -45,7 +45,8 @@ namespace { class SockaddrResolver : public Resolver { public: /// Takes ownership of \a addresses. - SockaddrResolver(const ResolverArgs& args, grpc_lb_addresses* addresses); + SockaddrResolver(const ResolverArgs& args, + UniquePtr<ServerAddressList> addresses); void NextLocked(grpc_channel_args** result, grpc_closure* on_complete) override; @@ -58,7 +59,7 @@ class SockaddrResolver : public Resolver { void MaybeFinishNextLocked(); /// the addresses that we've "resolved" - grpc_lb_addresses* addresses_ = nullptr; + UniquePtr<ServerAddressList> addresses_; /// channel args grpc_channel_args* channel_args_ = nullptr; /// have we published? @@ -70,13 +71,12 @@ class SockaddrResolver : public Resolver { }; SockaddrResolver::SockaddrResolver(const ResolverArgs& args, - grpc_lb_addresses* addresses) + UniquePtr<ServerAddressList> addresses) : Resolver(args.combiner), - addresses_(addresses), + addresses_(std::move(addresses)), channel_args_(grpc_channel_args_copy(args.args)) {} SockaddrResolver::~SockaddrResolver() { - grpc_lb_addresses_destroy(addresses_); grpc_channel_args_destroy(channel_args_); } @@ -100,7 +100,7 @@ void SockaddrResolver::ShutdownLocked() { void SockaddrResolver::MaybeFinishNextLocked() { if (next_completion_ != nullptr && !published_) { published_ = true; - grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses_); + grpc_arg arg = CreateServerAddressListChannelArg(addresses_.get()); *target_result_ = grpc_channel_args_copy_and_add(channel_args_, &arg, 1); GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; @@ -127,27 +127,27 @@ OrphanablePtr<Resolver> CreateSockaddrResolver( grpc_slice_buffer path_parts; grpc_slice_buffer_init(&path_parts); grpc_slice_split(path_slice, ",", &path_parts); - grpc_lb_addresses* addresses = grpc_lb_addresses_create( - path_parts.count, nullptr /* user_data_vtable */); + auto addresses = MakeUnique<ServerAddressList>(); bool errors_found = false; - for (size_t i = 0; i < addresses->num_addresses; i++) { + for (size_t i = 0; i < path_parts.count; i++) { grpc_uri ith_uri = *args.uri; - char* part_str = grpc_slice_to_c_string(path_parts.slices[i]); - ith_uri.path = part_str; - if (!parse(&ith_uri, &addresses->addresses[i].address)) { + UniquePtr<char> part_str(grpc_slice_to_c_string(path_parts.slices[i])); + ith_uri.path = part_str.get(); + grpc_resolved_address addr; + if (!parse(&ith_uri, &addr)) { errors_found = true; /* GPR_TRUE */ + break; } - gpr_free(part_str); - if (errors_found) break; + addresses->emplace_back(addr, nullptr /* args */); } grpc_slice_buffer_destroy_internal(&path_parts); grpc_slice_unref_internal(path_slice); if (errors_found) { - grpc_lb_addresses_destroy(addresses); return OrphanablePtr<Resolver>(nullptr); } // Instantiate resolver. - return OrphanablePtr<Resolver>(New<SockaddrResolver>(args, addresses)); + return OrphanablePtr<Resolver>( + New<SockaddrResolver>(args, std::move(addresses))); } class IPv4ResolverFactory : public ResolverFactory { diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc index 82a26ace63..9a0122e8ec 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -30,9 +30,11 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/uri/uri_parser.h" // As per the retry design, we do not allow more than 5 retry attempts. #define MAX_MAX_RETRY_ATTEMPTS 5 @@ -40,38 +42,17 @@ namespace grpc_core { namespace internal { -namespace { - -// Converts string format from JSON to proto. -grpc_core::UniquePtr<char> ConvertCamelToSnake(const char* camel) { - const size_t size = strlen(camel); - char* snake = static_cast<char*>(gpr_malloc(size * 2)); - size_t j = 0; - for (size_t i = 0; i < size; ++i) { - if (isupper(camel[i])) { - snake[j++] = '_'; - snake[j++] = tolower(camel[i]); - } else { - snake[j++] = camel[i]; - } - } - snake[j] = '\0'; - return grpc_core::UniquePtr<char>(snake); -} - -} // namespace - ProcessedResolverResult::ProcessedResolverResult( - const grpc_channel_args* resolver_result, bool parse_retry) { + const grpc_channel_args& resolver_result, bool parse_retry) { ProcessServiceConfig(resolver_result, parse_retry); // If no LB config was found above, just find the LB policy name then. - if (lb_policy_config_ == nullptr) ProcessLbPolicyName(resolver_result); + if (lb_policy_name_ == nullptr) ProcessLbPolicyName(resolver_result); } void ProcessedResolverResult::ProcessServiceConfig( - const grpc_channel_args* resolver_result, bool parse_retry) { + const grpc_channel_args& resolver_result, bool parse_retry) { const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_SERVICE_CONFIG); + grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVICE_CONFIG); const char* service_config_json = grpc_channel_arg_get_string(channel_arg); if (service_config_json != nullptr) { service_config_json_.reset(gpr_strdup(service_config_json)); @@ -79,7 +60,7 @@ void ProcessedResolverResult::ProcessServiceConfig( if (service_config_ != nullptr) { if (parse_retry) { channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_SERVER_URI); + grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(channel_arg); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); @@ -97,42 +78,56 @@ void ProcessedResolverResult::ProcessServiceConfig( } void ProcessedResolverResult::ProcessLbPolicyName( - const grpc_channel_args* resolver_result) { - const char* lb_policy_name = nullptr; + const grpc_channel_args& resolver_result) { // Prefer the LB policy name found in the service config. Note that this is // checking the deprecated loadBalancingPolicy field, rather than the new // loadBalancingConfig field. if (service_config_ != nullptr) { - lb_policy_name = service_config_->GetLoadBalancingPolicyName(); + lb_policy_name_.reset( + gpr_strdup(service_config_->GetLoadBalancingPolicyName())); + // Convert to lower-case. + if (lb_policy_name_ != nullptr) { + char* lb_policy_name = lb_policy_name_.get(); + for (size_t i = 0; i < strlen(lb_policy_name); ++i) { + lb_policy_name[i] = tolower(lb_policy_name[i]); + } + } } // Otherwise, find the LB policy name set by the client API. - if (lb_policy_name == nullptr) { + if (lb_policy_name_ == nullptr) { const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_LB_POLICY_NAME); - lb_policy_name = grpc_channel_arg_get_string(channel_arg); + grpc_channel_args_find(&resolver_result, GRPC_ARG_LB_POLICY_NAME); + lb_policy_name_.reset(gpr_strdup(grpc_channel_arg_get_string(channel_arg))); } // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver has returned. - const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); - if (grpc_lb_addresses_contains_balancer_address(*addresses)) { - if (lb_policy_name != nullptr && - gpr_stricmp(lb_policy_name, "grpclb") != 0) { + const ServerAddressList* addresses = + FindServerAddressListChannelArg(&resolver_result); + if (addresses != nullptr) { + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->size(); ++i) { + const ServerAddress& address = (*addresses)[i]; + if (address.IsBalancer()) { + found_balancer_address = true; + break; + } + } + if (found_balancer_address) { + if (lb_policy_name_ != nullptr && + strcmp(lb_policy_name_.get(), "grpclb") != 0) { gpr_log(GPR_INFO, "resolver requested LB policy %s but provided at least one " "balancer address -- forcing use of grpclb LB policy", - lb_policy_name); + lb_policy_name_.get()); } - lb_policy_name = "grpclb"; + lb_policy_name_.reset(gpr_strdup("grpclb")); } } // Use pick_first if nothing was specified and we didn't select grpclb // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - lb_policy_name_.reset(gpr_strdup(lb_policy_name)); + if (lb_policy_name_ == nullptr) { + lb_policy_name_.reset(gpr_strdup("pick_first")); + } } void ProcessedResolverResult::ParseServiceConfig( @@ -175,15 +170,13 @@ void ProcessedResolverResult::ParseLbConfigFromServiceConfig( if (policy_content != nullptr) return; // Violate "oneof" type. policy_content = field; } - grpc_core::UniquePtr<char> lb_policy_name = - ConvertCamelToSnake(policy_content->key); - if (!grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists( - lb_policy_name.get())) { - continue; + // If we support this policy, then select it. + if (grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists( + policy_content->key)) { + lb_policy_name_.reset(gpr_strdup(policy_content->key)); + lb_policy_config_ = policy_content->child; + return; } - lb_policy_name_ = std::move(lb_policy_name); - lb_policy_config_ = policy_content->child; - return; } } diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h index f1fb7406bc..98a9d26c46 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.h +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h @@ -36,8 +36,7 @@ namespace internal { class ClientChannelMethodParams; // A table mapping from a method name to its method parameters. -typedef grpc_core::SliceHashTable< - grpc_core::RefCountedPtr<ClientChannelMethodParams>> +typedef SliceHashTable<RefCountedPtr<ClientChannelMethodParams>> ClientChannelMethodParamsTable; // A container of processed fields from the resolver result. Simplifies the @@ -47,33 +46,30 @@ class ProcessedResolverResult { // Processes the resolver result and populates the relative members // for later consumption. Tries to parse retry parameters only if parse_retry // is true. - ProcessedResolverResult(const grpc_channel_args* resolver_result, + ProcessedResolverResult(const grpc_channel_args& resolver_result, bool parse_retry); // Getters. Any managed object's ownership is transferred. - grpc_core::UniquePtr<char> service_config_json() { + UniquePtr<char> service_config_json() { return std::move(service_config_json_); } - grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() { + RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() { return std::move(retry_throttle_data_); } - grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> - method_params_table() { + RefCountedPtr<ClientChannelMethodParamsTable> method_params_table() { return std::move(method_params_table_); } - grpc_core::UniquePtr<char> lb_policy_name() { - return std::move(lb_policy_name_); - } + UniquePtr<char> lb_policy_name() { return std::move(lb_policy_name_); } grpc_json* lb_policy_config() { return lb_policy_config_; } private: // Finds the service config; extracts LB config and (maybe) retry throttle // params from it. - void ProcessServiceConfig(const grpc_channel_args* resolver_result, + void ProcessServiceConfig(const grpc_channel_args& resolver_result, bool parse_retry); // Finds the LB policy name (when no LB config was found). - void ProcessLbPolicyName(const grpc_channel_args* resolver_result); + void ProcessLbPolicyName(const grpc_channel_args& resolver_result); // Parses the service config. Intended to be used by // ServiceConfig::ParseGlobalParams. @@ -85,16 +81,16 @@ class ProcessedResolverResult { void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field); // Service config. - grpc_core::UniquePtr<char> service_config_json_; - grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config_; + UniquePtr<char> service_config_json_; + UniquePtr<grpc_core::ServiceConfig> service_config_; // LB policy. grpc_json* lb_policy_config_ = nullptr; - grpc_core::UniquePtr<char> lb_policy_name_; + UniquePtr<char> lb_policy_name_; // Retry throttle data. char* server_name_ = nullptr; - grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; + RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; // Method params table. - grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_; + RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_; }; // The parameters of a method. diff --git a/src/core/ext/filters/client_channel/server_address.cc b/src/core/ext/filters/client_channel/server_address.cc new file mode 100644 index 0000000000..ec33cbbd95 --- /dev/null +++ b/src/core/ext/filters/client_channel/server_address.cc @@ -0,0 +1,103 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/server_address.h" + +#include <string.h> + +namespace grpc_core { + +// +// ServerAddress +// + +ServerAddress::ServerAddress(const grpc_resolved_address& address, + grpc_channel_args* args) + : address_(address), args_(args) {} + +ServerAddress::ServerAddress(const void* address, size_t address_len, + grpc_channel_args* args) + : args_(args) { + memcpy(address_.addr, address, address_len); + address_.len = static_cast<socklen_t>(address_len); +} + +int ServerAddress::Cmp(const ServerAddress& other) const { + if (address_.len > other.address_.len) return 1; + if (address_.len < other.address_.len) return -1; + int retval = memcmp(address_.addr, other.address_.addr, address_.len); + if (retval != 0) return retval; + return grpc_channel_args_compare(args_, other.args_); +} + +bool ServerAddress::IsBalancer() const { + return grpc_channel_arg_get_bool( + grpc_channel_args_find(args_, GRPC_ARG_ADDRESS_IS_BALANCER), false); +} + +// +// ServerAddressList +// + +namespace { + +void* ServerAddressListCopy(void* addresses) { + ServerAddressList* a = static_cast<ServerAddressList*>(addresses); + return New<ServerAddressList>(*a); +} + +void ServerAddressListDestroy(void* addresses) { + ServerAddressList* a = static_cast<ServerAddressList*>(addresses); + Delete(a); +} + +int ServerAddressListCompare(void* addresses1, void* addresses2) { + ServerAddressList* a1 = static_cast<ServerAddressList*>(addresses1); + ServerAddressList* a2 = static_cast<ServerAddressList*>(addresses2); + if (a1->size() > a2->size()) return 1; + if (a1->size() < a2->size()) return -1; + for (size_t i = 0; i < a1->size(); ++i) { + int retval = (*a1)[i].Cmp((*a2)[i]); + if (retval != 0) return retval; + } + return 0; +} + +const grpc_arg_pointer_vtable server_addresses_arg_vtable = { + ServerAddressListCopy, ServerAddressListDestroy, ServerAddressListCompare}; + +} // namespace + +grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses) { + return grpc_channel_arg_pointer_create( + const_cast<char*>(GRPC_ARG_SERVER_ADDRESS_LIST), + const_cast<ServerAddressList*>(addresses), &server_addresses_arg_vtable); +} + +ServerAddressList* FindServerAddressListChannelArg( + const grpc_channel_args* channel_args) { + const grpc_arg* lb_addresses_arg = + grpc_channel_args_find(channel_args, GRPC_ARG_SERVER_ADDRESS_LIST); + if (lb_addresses_arg == nullptr || lb_addresses_arg->type != GRPC_ARG_POINTER) + return nullptr; + return static_cast<ServerAddressList*>(lb_addresses_arg->value.pointer.p); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/server_address.h b/src/core/ext/filters/client_channel/server_address.h new file mode 100644 index 0000000000..3a1bf1df67 --- /dev/null +++ b/src/core/ext/filters/client_channel/server_address.h @@ -0,0 +1,108 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/uri/uri_parser.h" + +// Channel arg key for ServerAddressList. +#define GRPC_ARG_SERVER_ADDRESS_LIST "grpc.server_address_list" + +// Channel arg key for a bool indicating whether an address is a grpclb +// load balancer (as opposed to a backend). +#define GRPC_ARG_ADDRESS_IS_BALANCER "grpc.address_is_balancer" + +// Channel arg key for a string indicating an address's balancer name. +#define GRPC_ARG_ADDRESS_BALANCER_NAME "grpc.address_balancer_name" + +namespace grpc_core { + +// +// ServerAddress +// + +// A server address is a grpc_resolved_address with an associated set of +// channel args. Any args present here will be merged into the channel +// args when a subchannel is created for this address. +class ServerAddress { + public: + // Takes ownership of args. + ServerAddress(const grpc_resolved_address& address, grpc_channel_args* args); + ServerAddress(const void* address, size_t address_len, + grpc_channel_args* args); + + ~ServerAddress() { grpc_channel_args_destroy(args_); } + + // Copyable. + ServerAddress(const ServerAddress& other) + : address_(other.address_), args_(grpc_channel_args_copy(other.args_)) {} + ServerAddress& operator=(const ServerAddress& other) { + address_ = other.address_; + grpc_channel_args_destroy(args_); + args_ = grpc_channel_args_copy(other.args_); + return *this; + } + + // Movable. + ServerAddress(ServerAddress&& other) + : address_(other.address_), args_(other.args_) { + other.args_ = nullptr; + } + ServerAddress& operator=(ServerAddress&& other) { + address_ = other.address_; + args_ = other.args_; + other.args_ = nullptr; + return *this; + } + + bool operator==(const ServerAddress& other) const { return Cmp(other) == 0; } + + int Cmp(const ServerAddress& other) const; + + const grpc_resolved_address& address() const { return address_; } + const grpc_channel_args* args() const { return args_; } + + bool IsBalancer() const; + + private: + grpc_resolved_address address_; + grpc_channel_args* args_; +}; + +// +// ServerAddressList +// + +typedef InlinedVector<ServerAddress, 1> ServerAddressList; + +// Returns a channel arg containing \a addresses. +grpc_arg CreateServerAddressListChannelArg(const ServerAddressList* addresses); + +// Returns the ServerListAddress instance in channel_args or NULL. +ServerAddressList* FindServerAddressListChannelArg( + const grpc_channel_args* channel_args); + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SERVER_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index a56db0201b..9077aa9753 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -153,7 +153,7 @@ struct grpc_subchannel { /** have we started the backoff loop */ bool backoff_begun; // reset_backoff() was called while alarm was pending - bool deferred_reset_backoff; + bool retry_immediately; /** our alarm */ grpc_timer alarm; @@ -709,8 +709,8 @@ static void on_alarm(void* arg, grpc_error* error) { if (c->disconnected) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", &error, 1); - } else if (c->deferred_reset_backoff) { - c->deferred_reset_backoff = false; + } else if (c->retry_immediately) { + c->retry_immediately = false; error = GRPC_ERROR_NONE; } else { GRPC_ERROR_REF(error); @@ -837,7 +837,7 @@ static bool publish_transport_locked(grpc_subchannel* c) { /* publish */ c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( - stk, c->channelz_subchannel, socket_uuid)); + stk, c->args, c->channelz_subchannel, socket_uuid)); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -887,12 +887,12 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { gpr_mu_lock(&subchannel->mu); + subchannel->backoff->Reset(); if (subchannel->have_alarm) { - subchannel->deferred_reset_backoff = true; + subchannel->retry_immediately = true; grpc_timer_cancel(&subchannel->alarm); } else { subchannel->backoff_begun = false; - subchannel->backoff->Reset(); maybe_start_connecting_locked(subchannel); } gpr_mu_unlock(&subchannel->mu); @@ -1068,16 +1068,18 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { ConnectedSubchannel::ConnectedSubchannel( - grpc_channel_stack* channel_stack, + grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_subchannel, intptr_t socket_uuid) - : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), + : RefCounted<ConnectedSubchannel>(&grpc_trace_stream_refcount), channel_stack_(channel_stack), + args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), socket_uuid_(socket_uuid) {} ConnectedSubchannel::~ConnectedSubchannel() { + grpc_channel_args_destroy(args_); GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index ec3b4d86e4..14f87f2c68 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -72,7 +72,7 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; namespace grpc_core { -class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { +class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { public: struct CallArgs { grpc_polling_entity* pollent; @@ -85,28 +85,31 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { size_t parent_data_size; }; - explicit ConnectedSubchannel( - grpc_channel_stack* channel_stack, + ConnectedSubchannel( + grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_subchannel, intptr_t socket_uuid); ~ConnectedSubchannel(); - grpc_channel_stack* channel_stack() { return channel_stack_; } void NotifyOnStateChange(grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); - channelz::SubchannelNode* channelz_subchannel() { + + grpc_channel_stack* channel_stack() const { return channel_stack_; } + const grpc_channel_args* args() const { return args_; } + channelz::SubchannelNode* channelz_subchannel() const { return channelz_subchannel_.get(); } - intptr_t socket_uuid() { return socket_uuid_; } + intptr_t socket_uuid() const { return socket_uuid_; } size_t GetInitialCallSizeEstimate(size_t parent_data_size) const; private: grpc_channel_stack* channel_stack_; + grpc_channel_args* args_; // ref counted pointer to the channelz node in this connected subchannel's // owning subchannel. grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index 1c23a6c4be..aa8441f17b 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -91,7 +91,7 @@ void grpc_subchannel_key_destroy(grpc_subchannel_key* k) { gpr_free(k); } -static void sck_avl_destroy(void* p, void* user_data) { +static void sck_avl_destroy(void* p, void* unused) { grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p)); } @@ -104,7 +104,7 @@ static long sck_avl_compare(void* a, void* b, void* unused) { static_cast<grpc_subchannel_key*>(b)); } -static void scv_avl_destroy(void* p, void* user_data) { +static void scv_avl_destroy(void* p, void* unused) { GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index"); } @@ -137,7 +137,7 @@ void grpc_subchannel_index_shutdown(void) { void grpc_subchannel_index_unref(void) { if (gpr_unref(&g_refcount)) { gpr_mu_destroy(&g_mu); - grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(g_subchannel_index, nullptr); } } @@ -147,13 +147,12 @@ grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( - (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()), - "index_find"); - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + (grpc_subchannel*)grpc_avl_get(index, key, nullptr), "index_find"); + grpc_avl_unref(index, nullptr); return c; } @@ -169,13 +168,11 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - grpc_avl index = - grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = static_cast<grpc_subchannel*>( - grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); + c = static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr)); if (c != nullptr) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -184,11 +181,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap - grpc_avl updated = - grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), - subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), - grpc_core::ExecCtx::Get()); + grpc_avl updated = grpc_avl_add( + grpc_avl_ref(index, nullptr), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), nullptr); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -200,9 +195,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); + grpc_avl_unref(updated, nullptr); } - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(index, nullptr); } if (need_to_unref_constructed) { @@ -219,24 +214,22 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - grpc_avl index = - grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel - grpc_subchannel* c = static_cast<grpc_subchannel*>( - grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); + grpc_subchannel* c = + static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr)); if (c != constructed) { - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(index, nullptr); break; } // compare and swap the update (some other thread may have // mutated the index behind us) grpc_avl updated = - grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key, - grpc_core::ExecCtx::Get()); + grpc_avl_remove(grpc_avl_ref(index, nullptr), key, nullptr); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -245,8 +238,8 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(updated, nullptr); + grpc_avl_unref(index, nullptr); } } |