diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 1035 |
1 files changed, 163 insertions, 872 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 70aac47231..dd741f1e2d 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -35,10 +35,10 @@ #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" +#include "src/core/ext/filters/client_channel/request_routing.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" -#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" @@ -63,7 +63,6 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" -using grpc_core::ServerAddressList; using grpc_core::internal::ClientChannelMethodParams; using grpc_core::internal::ClientChannelMethodParamsTable; using grpc_core::internal::ProcessedResolverResult; @@ -88,31 +87,18 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); struct external_connectivity_watcher; typedef struct client_channel_channel_data { - grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; - bool started_resolving; + grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router; + bool deadline_checking_enabled; - grpc_client_channel_factory* client_channel_factory; bool enable_retries; size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; - /** currently active load balancer */ - grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; /** retry throttle data */ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table; - /** incoming resolver result - set by resolver.next() */ - grpc_channel_args* resolver_result; - /** a list of closures that are all waiting for resolver result to come in */ - grpc_closure_list waiting_for_resolver_result_closures; - /** resolver callback */ - grpc_closure on_resolver_result_changed; - /** connectivity state being tracked */ - grpc_connectivity_state_tracker state_tracker; - /** when an lb_policy arrives, should we try to exit idle */ - bool exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack* owning_stack; /** interested parties (owned) */ @@ -129,418 +115,40 @@ typedef struct client_channel_channel_data { grpc_core::UniquePtr<char> info_lb_policy_name; /** service config in JSON form */ grpc_core::UniquePtr<char> info_service_config_json; - /* backpointer to grpc_channel's channelz node */ - grpc_core::channelz::ClientChannelNode* channelz_channel; - /* caches if the last resolution event contained addresses */ - bool previous_resolution_contained_addresses; } channel_data; -typedef struct { - channel_data* chand; - /** used as an identifier, don't dereference it because the LB policy may be - * non-existing when the callback is run */ - grpc_core::LoadBalancingPolicy* lb_policy; - grpc_closure closure; -} reresolution_request_args; - -/** We create one watcher for each new lb_policy that is returned from a - resolver, to watch for state changes from the lb_policy. When a state - change is seen, we update the channel, and create a new watcher. */ -typedef struct { - channel_data* chand; - grpc_closure on_changed; - grpc_connectivity_state state; - grpc_core::LoadBalancingPolicy* lb_policy; -} lb_policy_connectivity_watcher; - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state); - -static const char* channel_connectivity_state_change_string( - grpc_connectivity_state state) { - switch (state) { - case GRPC_CHANNEL_IDLE: - return "Channel state change to IDLE"; - case GRPC_CHANNEL_CONNECTING: - return "Channel state change to CONNECTING"; - case GRPC_CHANNEL_READY: - return "Channel state change to READY"; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - return "Channel state change to TRANSIENT_FAILURE"; - case GRPC_CHANNEL_SHUTDOWN: - return "Channel state change to SHUTDOWN"; - } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); -} - -static void set_channel_connectivity_state_locked(channel_data* chand, - grpc_connectivity_state state, - grpc_error* error, - const char* reason) { - /* TODO: Improve failure handling: - * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. - * - Hand over pending picks from old policies during the switch that happens - * when resolver provides an update. */ - if (chand->lb_policy != nullptr) { - if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* cancel picks with wait_for_ready=false */ - chand->lb_policy->CancelMatchingPicksLocked( - /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, - /* check= */ 0, GRPC_ERROR_REF(error)); - } else if (state == GRPC_CHANNEL_SHUTDOWN) { - /* cancel all picks */ - chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, - GRPC_ERROR_REF(error)); - } - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, - grpc_connectivity_state_name(state)); - } - if (chand->channelz_channel != nullptr) { - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - channel_connectivity_state_change_string(state))); - } - grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); -} - -static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { - lb_policy_connectivity_watcher* w = - static_cast<lb_policy_connectivity_watcher*>(arg); - /* check if the notification is for the latest policy */ - if (w->lb_policy == w->chand->lb_policy.get()) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, - w->lb_policy, grpc_connectivity_state_name(w->state)); - } - set_channel_connectivity_state_locked(w->chand, w->state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy_locked(w->chand, w->lb_policy, w->state); - } - } - GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); - gpr_free(w); -} - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state) { - lb_policy_connectivity_watcher* w = - static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w))); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; - GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner)); - w->state = current_state; - w->lb_policy = lb_policy; - lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); -} - -static void start_resolving_locked(channel_data* chand) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); - } - GPR_ASSERT(!chand->started_resolving); - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); -} - -// Invoked from the resolver NextLocked() callback when the resolver -// is shutting down. -static void on_resolver_shutdown_locked(channel_data* chand, - grpc_error* error) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down", chand); - } - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - if (chand->resolver != nullptr) { - // This should never happen; it can only be triggered by a resolver - // implementation spotaneously deciding to report shutdown without - // being orphaned. This code is included just to be defensive. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", - chand, chand->resolver.get()); - } - chand->resolver.reset(); - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver spontaneous shutdown", &error, 1), - "resolver_spontaneous_shutdown"); - } - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - GRPC_ERROR_UNREF(error); -} - -static void request_reresolution_locked(void* arg, grpc_error* error) { - reresolution_request_args* args = - static_cast<reresolution_request_args*>(arg); - channel_data* chand = args->chand; - // If this invocation is for a stale LB policy, treat it as an LB shutdown - // signal. - if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || - chand->resolver == nullptr) { - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); - gpr_free(args); - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); - } - chand->resolver->RequestReresolutionLocked(); - // Give back the closure to the LB policy. - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); -} - -using TraceStringVector = grpc_core::InlinedVector<char*, 3>; - -// Creates a new LB policy, replacing any previous one. -// If the new policy is created successfully, sets *connectivity_state and -// *connectivity_error to its initial connectivity state; otherwise, -// leaves them unchanged. -static void create_new_lb_policy_locked( - channel_data* chand, char* lb_policy_name, grpc_json* lb_config, - grpc_connectivity_state* connectivity_state, - grpc_error** connectivity_error, TraceStringVector* trace_strings) { - grpc_core::LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = chand->combiner; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.lb_config = lb_config; - grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy = - grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - lb_policy_name, lb_policy_args); - if (GPR_UNLIKELY(new_lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, - lb_policy_name, new_lb_policy.get()); - } - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - // Swap out the LB policy and update the fds in - // chand->interested_parties. - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); - } - chand->lb_policy = std::move(new_lb_policy); - grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - // Set up re-resolution callback. - reresolution_request_args* args = - static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args))); - args->chand = chand; - args->lb_policy = chand->lb_policy.get(); - GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, - grpc_combiner_scheduler(chand->combiner)); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); - // Get the new LB policy's initial connectivity state and start a - // connectivity watch. - GRPC_ERROR_UNREF(*connectivity_error); - *connectivity_state = - chand->lb_policy->CheckConnectivityLocked(connectivity_error); - if (chand->exit_idle_when_lb_policy_arrives) { - chand->lb_policy->ExitIdleLocked(); - chand->exit_idle_when_lb_policy_arrives = false; - } - watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); - } -} - -static void maybe_add_trace_message_for_address_changes_locked( - channel_data* chand, TraceStringVector* trace_strings) { - const ServerAddressList* addresses = - grpc_core::FindServerAddressListChannelArg(chand->resolver_result); - const bool resolution_contains_addresses = - addresses != nullptr && addresses->size() > 0; - if (!resolution_contains_addresses && - chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became empty")); - } else if (resolution_contains_addresses && - !chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became non-empty")); - } - chand->previous_resolution_contained_addresses = - resolution_contains_addresses; -} - -static void concatenate_and_add_channel_trace_locked( - channel_data* chand, TraceStringVector* trace_strings) { - if (!trace_strings->empty()) { - gpr_strvec v; - gpr_strvec_init(&v); - gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); - bool is_first = 1; - for (size_t i = 0; i < trace_strings->size(); ++i) { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); - is_first = false; - gpr_strvec_add(&v, (*trace_strings)[i]); - } - char* flat; - size_t flat_len = 0; - flat = gpr_strvec_flatten(&v, &flat_len); - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_new(flat, flat_len, gpr_free)); - gpr_strvec_destroy(&v); - } -} - -// Callback invoked when a resolver result is available. -static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { +// Synchronous callback from chand->request_router to process a resolver +// result update. +static bool process_resolver_result_locked(void* arg, + const grpc_channel_args& args, + const char** lb_policy_name, + grpc_json** lb_policy_config) { channel_data* chand = static_cast<channel_data*>(arg); + ProcessedResolverResult resolver_result(args, chand->enable_retries); + grpc_core::UniquePtr<char> service_config_json = + resolver_result.service_config_json(); if (grpc_client_channel_trace.enabled()) { - const char* disposition = - chand->resolver_result != nullptr - ? "" - : (error == GRPC_ERROR_NONE ? " (transient error)" - : " (resolver shutdown)"); - gpr_log(GPR_INFO, - "chand=%p: got resolver result: resolver_result=%p error=%s%s", - chand, chand->resolver_result, grpc_error_string(error), - disposition); + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json.get()); } - // Handle shutdown. - if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { - on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); - return; - } - // Data used to set the channel's connectivity state. - bool set_connectivity_state = true; - // We only want to trace the address resolution in the follow cases: - // (a) Address resolution resulted in service config change. - // (b) Address resolution that causes number of backends to go from - // zero to non-zero. - // (c) Address resolution that causes number of backends to go from - // non-zero to zero. - // (d) Address resolution that causes a new LB policy to be created. - // - // we track a list of strings to eventually be concatenated and traced. - TraceStringVector trace_strings; - grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error* connectivity_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); - // chand->resolver_result will be null in the case of a transient - // resolution error. In that case, we don't have any new result to - // process, which means that we keep using the previous result (if any). - if (chand->resolver_result == nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); - } - // Don't override connectivity state if we already have an LB policy. - if (chand->lb_policy != nullptr) set_connectivity_state = false; - } else { - // Parse the resolver result. - ProcessedResolverResult resolver_result(chand->resolver_result, - chand->enable_retries); - chand->retry_throttle_data = resolver_result.retry_throttle_data(); - chand->method_params_table = resolver_result.method_params_table(); - grpc_core::UniquePtr<char> service_config_json = - resolver_result.service_config_json(); - if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", - chand, service_config_json.get()); - } - grpc_core::UniquePtr<char> lb_policy_name = - resolver_result.lb_policy_name(); - grpc_json* lb_policy_config = resolver_result.lb_policy_config(); - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - bool lb_policy_name_changed = - chand->info_lb_policy_name == nullptr || - strcmp(chand->info_lb_policy_name.get(), lb_policy_name.get()) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", - chand, lb_policy_name.get(), chand->lb_policy.get()); - } - chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config); - // No need to set the channel's connectivity state; the existing - // watch on the LB policy will take care of that. - set_connectivity_state = false; - } else { - // Instantiate new LB policy. - create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config, - &connectivity_state, &connectivity_error, - &trace_strings); - } - // Note: It's safe to use chand->info_service_config_json here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - if (chand->channelz_channel != nullptr) { - if (((service_config_json == nullptr) != - (chand->info_service_config_json == nullptr)) || - (service_config_json != nullptr && - strcmp(service_config_json.get(), - chand->info_service_config_json.get()) != 0)) { - // TODO(ncteisen): might be worth somehow including a snippet of the - // config in the trace, at the risk of bloating the trace logs. - trace_strings.push_back(gpr_strdup("Service config changed")); - } - maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings); - concatenate_and_add_channel_trace_locked(chand, &trace_strings); - } - // Swap out the data used by cc_get_channel_info(). - gpr_mu_lock(&chand->info_mu); - chand->info_lb_policy_name = std::move(lb_policy_name); - chand->info_service_config_json = std::move(service_config_json); - gpr_mu_unlock(&chand->info_mu); - // Clean up. - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - } - // Set the channel's connectivity state if needed. - if (set_connectivity_state) { - set_channel_connectivity_state_locked( - chand, connectivity_state, connectivity_error, "resolver_result"); - } else { - GRPC_ERROR_UNREF(connectivity_error); - } - // Invoke closures that were waiting for results and renew the watch. - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); + // Update channel state. + chand->retry_throttle_data = resolver_result.retry_throttle_data(); + chand->method_params_table = resolver_result.method_params_table(); + // Swap out the data used by cc_get_channel_info(). + gpr_mu_lock(&chand->info_mu); + chand->info_lb_policy_name = resolver_result.lb_policy_name(); + const bool service_config_changed = + ((service_config_json == nullptr) != + (chand->info_service_config_json == nullptr)) || + (service_config_json != nullptr && + strcmp(service_config_json.get(), + chand->info_service_config_json.get()) != 0); + chand->info_service_config_json = std::move(service_config_json); + gpr_mu_unlock(&chand->info_mu); + // Return results. + *lb_policy_name = chand->info_lb_policy_name.get(); + *lb_policy_config = resolver_result.lb_policy_config(); + return service_config_changed; } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { @@ -550,15 +158,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); if (op->on_connectivity_state_change != nullptr) { - grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + chand->request_router->NotifyOnConnectivityStateChange( + op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { - if (chand->lb_policy == nullptr) { + if (chand->request_router->lb_policy() == nullptr) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); @@ -567,7 +174,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; // Pick must return synchronously, because pick_state.on_complete is null. - GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); + GPR_ASSERT( + chand->request_router->lb_policy()->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); @@ -586,37 +194,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } if (op->disconnect_with_error != GRPC_ERROR_NONE) { - if (chand->resolver != nullptr) { - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - chand->resolver.reset(); - if (!chand->started_resolving) { - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_REF(op->disconnect_with_error)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - } - GRPC_ERROR_UNREF(op->disconnect_with_error); + chand->request_router->ShutdownLocked(op->disconnect_with_error); } if (op->reset_connect_backoff) { - if (chand->resolver != nullptr) { - chand->resolver->ResetBackoffLocked(); - chand->resolver->RequestReresolutionLocked(); - } - if (chand->lb_policy != nullptr) { - chand->lb_policy->ResetBackoffLocked(); - } + chand->request_router->ResetConnectionBackoffLocked(); } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } @@ -667,12 +252,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; - GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, - on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner)); + chand->deadline_checking_enabled = + grpc_deadline_checking_enabled(args->channel_args); chand->interested_parties = grpc_pollset_set_create(); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. const grpc_arg* arg = grpc_channel_args_find( @@ -682,8 +264,6 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, // Record enable_retries. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); chand->enable_retries = grpc_channel_arg_get_bool(arg, true); - chand->channelz_channel = nullptr; - chand->previous_resolution_contained_addresses = false; // Record client channel factory. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); @@ -695,9 +275,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref( - static_cast<grpc_client_channel_factory*>(arg->value.pointer.p)); - chand->client_channel_factory = + grpc_client_channel_factory* client_channel_factory = static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); @@ -713,39 +291,24 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_args* new_args = nullptr; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); - // Instantiate resolver. - chand->resolver = grpc_core::ResolverRegistry::CreateResolver( - proxy_name != nullptr ? proxy_name : arg->value.string, - new_args != nullptr ? new_args : args->channel_args, - chand->interested_parties, chand->combiner); - if (proxy_name != nullptr) gpr_free(proxy_name); - if (new_args != nullptr) grpc_channel_args_destroy(new_args); - if (chand->resolver == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); - } - chand->deadline_checking_enabled = - grpc_deadline_checking_enabled(args->channel_args); - return GRPC_ERROR_NONE; + // Instantiate request router. + grpc_client_channel_factory_ref(client_channel_factory); + grpc_error* error = GRPC_ERROR_NONE; + chand->request_router.Init( + chand->owning_stack, chand->combiner, client_channel_factory, + chand->interested_parties, &grpc_client_channel_trace, + process_resolver_result_locked, chand, + proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */, + new_args != nullptr ? new_args : args->channel_args, &error); + gpr_free(proxy_name); + grpc_channel_args_destroy(new_args); + return error; } /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - if (chand->resolver != nullptr) { - // The only way we can get here is if we never started resolving, - // because we take a ref to the channel stack when we start - // resolving and do not release it until the resolver callback is - // invoked after the resolver shuts down. - chand->resolver.reset(); - } - if (chand->client_channel_factory != nullptr) { - grpc_client_channel_factory_unref(chand->client_channel_factory); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } + chand->request_router.Destroy(); // TODO(roth): Once we convert the filter API to C++, there will no // longer be any need to explicitly reset these smart pointer data members. chand->info_lb_policy_name.reset(); @@ -753,7 +316,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); - grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); @@ -810,6 +372,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { // - add census stats for retries namespace { + struct call_data; // State used for starting a retryable batch on a subchannel call. @@ -894,12 +457,12 @@ struct subchannel_call_retry_state { bool completed_recv_initial_metadata : 1; bool started_recv_trailing_metadata : 1; bool completed_recv_trailing_metadata : 1; + // State for callback processing. subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; grpc_error* recv_message_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; - // State for callback processing. // NOTE: Do not move this next to the metadata bitfields above. That would // save space but will also result in a data race because compiler will // generate a 2 byte store which overwrites the meta-data fields upon @@ -908,12 +471,12 @@ struct subchannel_call_retry_state { }; // Pending batches stored in call data. -typedef struct { +struct pending_batch { // The pending batch. If nullptr, this slot is empty. grpc_transport_stream_op_batch* batch; // Indicates whether payload for send ops has been cached in call data. bool send_ops_cached; -} pending_batch; +}; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. @@ -950,11 +513,8 @@ struct call_data { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { GPR_ASSERT(pending_batches[i].batch == nullptr); } - for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (pick.subchannel_call_context[i].value != nullptr) { - pick.subchannel_call_context[i].destroy( - pick.subchannel_call_context[i].value); - } + if (have_request) { + request.Destroy(); } } @@ -981,12 +541,11 @@ struct call_data { // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; - grpc_core::LoadBalancingPolicy::PickState pick; + grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request; + bool have_request = false; grpc_closure pick_closure; - grpc_closure pick_cancel_closure; grpc_polling_entity* pollent = nullptr; - bool pollent_added_to_interested_parties = false; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -1036,6 +595,7 @@ struct call_data { grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; grpc_metadata_batch send_trailing_metadata; }; + } // namespace // Forward declarations. @@ -1438,8 +998,9 @@ static void do_retry(grpc_call_element* elem, "client_channel_call_retry"); calld->subchannel_call = nullptr; } - if (calld->pick.connected_subchannel != nullptr) { - calld->pick.connected_subchannel.reset(); + if (calld->have_request) { + calld->have_request = false; + calld->request.Destroy(); } // Compute backoff delay. grpc_millis next_attempt_time; @@ -1588,6 +1149,7 @@ static bool maybe_retry(grpc_call_element* elem, // namespace { + subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) @@ -1628,6 +1190,7 @@ void subchannel_batch_data::destroy() { call_data* calld = static_cast<call_data*>(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } + } // namespace // Creates a subchannel_batch_data object on the call's arena with the @@ -2644,17 +2207,18 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { const size_t parent_data_size = calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->pick.subchannel_call_context, // context - calld->call_combiner, // call_combiner - parent_data_size // parent_data_size + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->request->pick()->subchannel_call_context, // context + calld->call_combiner, // call_combiner + parent_data_size // parent_data_size }; - grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( - call_args, &calld->subchannel_call); + grpc_error* new_error = + calld->request->pick()->connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -2666,7 +2230,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { if (parent_data_size > 0) { new (grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)) - subchannel_call_retry_state(calld->pick.subchannel_call_context); + subchannel_call_retry_state( + calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } @@ -2678,7 +2243,7 @@ static void pick_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { + if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) { // Failed to create subchannel. // If there was no error, this is an LB policy drop, in which case // we return an error; otherwise, we may retry. @@ -2707,135 +2272,27 @@ static void pick_done(void* arg, grpc_error* error) { } } -static void maybe_add_call_to_channel_interested_parties_locked( - grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (!calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = true; - grpc_polling_entity_add_to_pollset_set(calld->pollent, - chand->interested_parties); - } -} - -static void maybe_del_call_from_channel_interested_parties_locked( - grpc_call_element* elem) { +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - if (calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = false; - grpc_polling_entity_del_from_pollset_set(calld->pollent, - chand->interested_parties); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (chand->request_router->GetConnectivityState() == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; } + return false; } -// Invoked when a pick is completed to leave the client_channel combiner -// and continue processing in the call combiner. -// If needed, removes the call's polling entity from chand->interested_parties. -static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { - call_data* calld = static_cast<call_data*>(elem->call_data); - maybe_del_call_from_channel_interested_parties_locked(elem); - GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->pick_closure, error); -} - -namespace grpc_core { - -// Performs subchannel pick via LB policy. -class LbPicker { - public: - // Starts a pick on chand->lb_policy. - static void StartLocked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy.get()); - } - // If this is a retry, use the send_initial_metadata payload that - // we've cached; otherwise, use the pending batch. The - // send_initial_metadata batch will be the first pending batch in the - // list, as set by get_batch_index() above. - calld->pick.initial_metadata = - calld->seen_send_initial_metadata - ? &calld->send_initial_metadata - : calld->pending_batches[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - calld->pick.initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, - grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->pick_closure; - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - grpc_error* error = GRPC_ERROR_NONE; - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); - if (GPR_LIKELY(pick_done)) { - // Pick completed synchronously. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", - chand, calld); - } - pick_done_locked(elem, error); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } else { - // Pick will be returned asynchronously. - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the LB policy can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); - // Request notification on call cancellation. - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, - &LbPicker::CancelLocked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - } - - private: - // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. - // Unrefs the LB policy and invokes pick_done_locked(). - static void DoneLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } - - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling pick from LB policy %p", chand, - calld, chand->lb_policy.get()); - } - chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); - } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); - } -}; - -} // namespace grpc_core - // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. static void apply_service_config_to_call_locked(grpc_call_element* elem) { @@ -2892,224 +2349,66 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } -// If the channel is in TRANSIENT_FAILURE and the call is not -// wait_for_ready=true, fails the call and returns true. -static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; - if (grpc_connectivity_state_check(&chand->state_tracker) == - GRPC_CHANNEL_TRANSIENT_FAILURE && - (batch->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { - pending_batches_fail( - elem, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "channel is in state TRANSIENT_FAILURE"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - true /* yield_call_combiner */); - return true; - } - return false; -} - // Invoked once resolver results are available. -static void process_service_config_and_start_lb_pick_locked( - grpc_call_element* elem) { +static bool maybe_apply_service_config_to_call_locked(void* arg) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); // Check this after applying service config, since it may have // affected the call's wait_for_ready value. - if (fail_call_if_in_transient_failure(elem)) return; + if (fail_call_if_in_transient_failure(elem)) return false; } - // Start LB pick. - grpc_core::LbPicker::StartLocked(elem); + return true; } -namespace grpc_core { - -// Handles waiting for a resolver result. -// Used only for the first call on an idle channel. -class ResolverResultWaiter { - public: - explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: deferring pick pending resolver result", - chand, calld); - } - // Add closure to be run when a resolver result is available. - GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, - grpc_combiner_scheduler(chand->combiner)); - AddToWaitingList(); - // Set cancellation closure, so that we abort if the call is cancelled. - GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, - this, grpc_combiner_scheduler(chand->combiner)); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, - &cancel_closure_); - } - - private: - // Adds closure_ to chand->waiting_for_resolver_result_closures. - void AddToWaitingList() { - channel_data* chand = static_cast<channel_data*>(elem_->channel_data); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &done_closure_, GRPC_ERROR_NONE); - } - - // Invoked when a resolver result is available. - static void DoneLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); - // If CancelLocked() has already run, delete ourselves without doing - // anything. Note that the call stack may have already been destroyed, - // so it's not safe to access anything in elem_. - if (GPR_UNLIKELY(self->finished_)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "call cancelled before resolver result"); - } - Delete(self); - return; - } - // Otherwise, process the resolver result. - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - // Shutting down. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { - // Transient resolver failure. - // If call has wait_for_ready=true, try again; otherwise, fail. - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=true; trying again", - chand, calld); - } - // Re-add ourselves to the waiting list. - self->AddToWaitingList(); - // Return early so that we don't set finished_ to true below. - return; - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=false; failing", - chand, calld); - } - pick_done_locked( - elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", - chand, calld); - } - process_service_config_and_start_lb_pick_locked(elem); - } - self->finished_ = true; - } - - // Invoked when the call is cancelled. - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); - // If DoneLocked() has already run, delete ourselves without doing anything. - if (GPR_LIKELY(self->finished_)) { - Delete(self); - return; - } - // If we are being cancelled, immediately invoke pick_done_locked() - // to propagate the error back to the caller. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling call waiting for name " - "resolution", - chand, calld); - } - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call pick_done_locked() here -- we are essentially - // calling it here instead of calling it in DoneLocked(). - pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - self->finished_ = true; - } - - grpc_call_element* elem_; - grpc_closure done_closure_; - grpc_closure cancel_closure_; - bool finished_ = false; -}; - -} // namespace grpc_core - static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data); - GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(!calld->have_request); GPR_ASSERT(calld->subchannel_call == nullptr); - if (GPR_LIKELY(chand->lb_policy != nullptr)) { - // We already have resolver results, so process the service config - // and start an LB pick. - process_service_config_and_start_lb_pick_locked(elem); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else { - // We do not yet have an LB policy, so wait for a resolver result. - if (GPR_UNLIKELY(!chand->started_resolving)) { - start_resolving_locked(chand); - } else { - // Normally, we want to do this check in - // process_service_config_and_start_lb_pick_locked(), so that we - // can honor the wait_for_ready setting in the service config. - // However, if the channel is in TRANSIENT_FAILURE at this point, that - // means that the resolver has returned a failure, so we're not going - // to get a service config right away. In that case, we fail the - // call now based on the wait_for_ready value passed in from the - // application. - if (fail_call_if_in_transient_failure(elem)) return; - } - // Create a new waiter, which will delete itself when done. - grpc_core::New<grpc_core::ResolverResultWaiter>(elem); - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the resolver can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); + // Normally, we want to do this check until after we've processed the + // service config, so that we can honor the wait_for_ready setting in + // the service config. However, if the channel is in TRANSIENT_FAILURE + // and we don't have an LB policy at this point, that means that the + // resolver has returned a failure, so we're not going to get a service + // config right away. In that case, we fail the call now based on the + // wait_for_ready value passed in from the application. + if (chand->request_router->lb_policy() == nullptr && + fail_call_if_in_transient_failure(elem)) { + return; } + // If this is a retry, use the send_initial_metadata payload that + // we've cached; otherwise, use the pending batch. The + // send_initial_metadata batch will be the first pending batch in the + // list, as set by get_batch_index() above. + // TODO(roth): What if the LB policy needs to add something to the + // call's initial metadata, and then there's a retry? We don't want + // the new metadata to be added twice. We might need to somehow + // allocate the subchannel batch earlier so that we can give the + // subchannel's copy of the metadata batch (which is copied for each + // attempt) to the LB policy instead the one from the parent channel. + grpc_metadata_batch* initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + uint32_t* initial_metadata_flags = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata_flags + : &calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, + grpc_schedule_on_exec_ctx); + calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent, + initial_metadata, initial_metadata_flags, + maybe_apply_service_config_to_call_locked, elem, + &calld->pick_closure); + calld->have_request = true; + chand->request_router->RouteCallLocked(calld->request.get()); } // @@ -3249,23 +2548,10 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { - channel_data* chand = static_cast<channel_data*>(arg); - if (chand->lb_policy != nullptr) { - chand->lb_policy->ExitIdleLocked(); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != nullptr) { - start_resolving_locked(chand); - } - } - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); -} - void grpc_client_channel_set_channelz_node( grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - chand->channelz_channel = node; + chand->request_router->set_channelz_node(node); } void grpc_client_channel_populate_child_refs( @@ -3273,17 +2559,22 @@ void grpc_client_channel_populate_child_refs( grpc_core::channelz::ChildRefsList* child_subchannels, grpc_core::channelz::ChildRefsList* child_channels) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - if (chand->lb_policy != nullptr) { - chand->lb_policy->FillChildRefsForChannelz(child_subchannels, - child_channels); + if (chand->request_router->lb_policy() != nullptr) { + chand->request_router->lb_policy()->FillChildRefsForChannelz( + child_subchannels, child_channels); } } +static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { + channel_data* chand = static_cast<channel_data*>(arg); + chand->request_router->ExitIdleLocked(); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - grpc_connectivity_state out = - grpc_connectivity_state_check(&chand->state_tracker); + grpc_connectivity_state out = chand->request_router->GetConnectivityState(); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( @@ -3328,19 +2619,19 @@ static void external_connectivity_watcher_list_append( } static void external_connectivity_watcher_list_remove( - channel_data* chand, external_connectivity_watcher* too_remove) { + channel_data* chand, external_connectivity_watcher* to_remove) { GPR_ASSERT( - lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + lookup_external_connectivity_watcher(chand, to_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - if (too_remove == chand->external_connectivity_watcher_list_head) { - chand->external_connectivity_watcher_list_head = too_remove->next; + if (to_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = to_remove->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { - if (w->next == too_remove) { + if (w->next == to_remove) { w->next = w->next->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; @@ -3392,15 +2683,15 @@ static void watch_connectivity_state_locked(void* arg, GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); - grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, - w->state, &w->my_closure); + w->chand->request_router->NotifyOnConnectivityStateChange(w->state, + &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); - grpc_connectivity_state_notify_on_state_change( - &found->chand->state_tracker, nullptr, &found->my_closure); + found->chand->request_router->NotifyOnConnectivityStateChange( + nullptr, &found->my_closure); } grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); |