/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/filters/client_channel/request_routing.h" #include #include #include #include #include #include #include #include #include #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" namespace grpc_core { // // RequestRouter::Request::ResolverResultWaiter // // Handles waiting for a resolver result. // Used only for the first call on an idle channel. class RequestRouter::Request::ResolverResultWaiter { public: explicit ResolverResultWaiter(Request* request) : request_router_(request->request_router_), request_(request), tracer_enabled_(request_router_->tracer_->enabled()) { if (tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: deferring pick pending resolver " "result", request_router_, request); } // Add closure to be run when a resolver result is available. GRPC_CLOSURE_INIT(&done_closure_, &DoneLocked, this, grpc_combiner_scheduler(request_router_->combiner_)); AddToWaitingList(); // Set cancellation closure, so that we abort if the call is cancelled. GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, grpc_combiner_scheduler(request_router_->combiner_)); grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, &cancel_closure_); } private: // Adds done_closure_ to // request_router_->waiting_for_resolver_result_closures_. void AddToWaitingList() { grpc_closure_list_append( &request_router_->waiting_for_resolver_result_closures_, &done_closure_, GRPC_ERROR_NONE); } // Invoked when a resolver result is available. static void DoneLocked(void* arg, grpc_error* error) { ResolverResultWaiter* self = static_cast(arg); RequestRouter* request_router = self->request_router_; // If CancelLocked() has already run, delete ourselves without doing // anything. Note that the call stack may have already been destroyed, // so it's not safe to access anything in state_. if (GPR_UNLIKELY(self->finished_)) { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p: call cancelled before resolver result", request_router); } Delete(self); return; } // Otherwise, process the resolver result. Request* request = self->request_; if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: resolver failed to return data", request_router, request); } GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_REF(error)); } else if (GPR_UNLIKELY(request_router->resolver_ == nullptr)) { // Shutting down. if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: resolver disconnected", request_router, request); } GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else if (GPR_UNLIKELY(request_router->lb_policy_ == nullptr)) { // Transient resolver failure. // If call has wait_for_ready=true, try again; otherwise, fail. if (*request->pick_.initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: resolver returned but no LB " "policy; wait_for_ready=true; trying again", request_router, request); } // Re-add ourselves to the waiting list. self->AddToWaitingList(); // Return early so that we don't set finished_ to true below. return; } else { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: resolver returned but no LB " "policy; wait_for_ready=false; failing", request_router, request); } GRPC_CLOSURE_RUN( request->on_route_done_, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: resolver returned, doing LB " "pick", request_router, request); } request->ProcessServiceConfigAndStartLbPickLocked(); } self->finished_ = true; } // Invoked when the call is cancelled. // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. static void CancelLocked(void* arg, grpc_error* error) { ResolverResultWaiter* self = static_cast(arg); RequestRouter* request_router = self->request_router_; // If DoneLocked() has already run, delete ourselves without doing anything. if (self->finished_) { Delete(self); return; } Request* request = self->request_; // If we are being cancelled, immediately invoke on_route_done_ // to propagate the error back to the caller. if (error != GRPC_ERROR_NONE) { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: cancelling call waiting for " "name resolution", request_router, request); } // Note: Although we are not in the call combiner here, we are // basically stealing the call combiner from the pending pick, so // it's safe to run on_route_done_ here -- we are essentially // calling it here instead of calling it in DoneLocked(). GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); } self->finished_ = true; } RequestRouter* request_router_; Request* request_; const bool tracer_enabled_; grpc_closure done_closure_; grpc_closure cancel_closure_; bool finished_ = false; }; // // RequestRouter::Request::AsyncPickCanceller // // Handles the call combiner cancellation callback for an async LB pick. class RequestRouter::Request::AsyncPickCanceller { public: explicit AsyncPickCanceller(Request* request) : request_router_(request->request_router_), request_(request), tracer_enabled_(request_router_->tracer_->enabled()) { GRPC_CALL_STACK_REF(request->owning_call_, "pick_callback_cancel"); // Set cancellation closure, so that we abort if the call is cancelled. GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, grpc_combiner_scheduler(request_router_->combiner_)); grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, &cancel_closure_); } void MarkFinishedLocked() { finished_ = true; GRPC_CALL_STACK_UNREF(request_->owning_call_, "pick_callback_cancel"); } private: // Invoked when the call is cancelled. // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. static void CancelLocked(void* arg, grpc_error* error) { AsyncPickCanceller* self = static_cast(arg); Request* request = self->request_; RequestRouter* request_router = self->request_router_; if (!self->finished_) { // Note: request_router->lb_policy_ may have changed since we started our // pick, in which case we will be cancelling the pick on a policy other // than the one we started it on. However, this will just be a no-op. if (error != GRPC_ERROR_NONE && request_router->lb_policy_ != nullptr) { if (self->tracer_enabled_) { gpr_log(GPR_INFO, "request_router=%p request=%p: cancelling pick from LB " "policy %p", request_router, request, request_router->lb_policy_.get()); } request_router->lb_policy_->CancelPickLocked(&request->pick_, GRPC_ERROR_REF(error)); } request->pick_canceller_ = nullptr; GRPC_CALL_STACK_UNREF(request->owning_call_, "pick_callback_cancel"); } Delete(self); } RequestRouter* request_router_; Request* request_; const bool tracer_enabled_; grpc_closure cancel_closure_; bool finished_ = false; }; // // RequestRouter::Request // RequestRouter::Request::Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner, grpc_polling_entity* pollent, grpc_metadata_batch* send_initial_metadata, uint32_t* send_initial_metadata_flags, ApplyServiceConfigCallback apply_service_config, void* apply_service_config_user_data, grpc_closure* on_route_done) : owning_call_(owning_call), call_combiner_(call_combiner), pollent_(pollent), apply_service_config_(apply_service_config), apply_service_config_user_data_(apply_service_config_user_data), on_route_done_(on_route_done) { pick_.initial_metadata = send_initial_metadata; pick_.initial_metadata_flags = send_initial_metadata_flags; } RequestRouter::Request::~Request() { if (pick_.connected_subchannel != nullptr) { pick_.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (pick_.subchannel_call_context[i].destroy != nullptr) { pick_.subchannel_call_context[i].destroy( pick_.subchannel_call_context[i].value); } } } // Invoked once resolver results are available. void RequestRouter::Request::ProcessServiceConfigAndStartLbPickLocked() { // Get service config data if needed. if (!apply_service_config_(apply_service_config_user_data_)) return; // Start LB pick. StartLbPickLocked(); } void RequestRouter::Request::MaybeAddCallToInterestedPartiesLocked() { if (!pollent_added_to_interested_parties_) { pollent_added_to_interested_parties_ = true; grpc_polling_entity_add_to_pollset_set( pollent_, request_router_->interested_parties_); } } void RequestRouter::Request::MaybeRemoveCallFromInterestedPartiesLocked() { if (pollent_added_to_interested_parties_) { pollent_added_to_interested_parties_ = false; grpc_polling_entity_del_from_pollset_set( pollent_, request_router_->interested_parties_); } } // Starts a pick on the LB policy. void RequestRouter::Request::StartLbPickLocked() { if (request_router_->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p request=%p: starting pick on lb_policy=%p", request_router_, this, request_router_->lb_policy_.get()); } GRPC_CLOSURE_INIT(&on_pick_done_, &LbPickDoneLocked, this, grpc_combiner_scheduler(request_router_->combiner_)); pick_.on_complete = &on_pick_done_; GRPC_CALL_STACK_REF(owning_call_, "pick_callback"); grpc_error* error = GRPC_ERROR_NONE; const bool pick_done = request_router_->lb_policy_->PickLocked(&pick_, &error); if (pick_done) { // Pick completed synchronously. if (request_router_->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p request=%p: pick completed synchronously", request_router_, this); } GRPC_CLOSURE_RUN(on_route_done_, error); GRPC_CALL_STACK_UNREF(owning_call_, "pick_callback"); } else { // Pick will be returned asynchronously. // Add the request's polling entity to the request_router's // interested_parties, so that the I/O of the LB policy can be done // under it. It will be removed in LbPickDoneLocked(). MaybeAddCallToInterestedPartiesLocked(); // Request notification on call cancellation. // We allocate a separate object to track cancellation, since the // cancellation closure might still be pending when we need to reuse // the memory in which this Request object is stored for a subsequent // retry attempt. pick_canceller_ = New(this); } } // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. // Unrefs the LB policy and invokes on_route_done_. void RequestRouter::Request::LbPickDoneLocked(void* arg, grpc_error* error) { Request* self = static_cast(arg); RequestRouter* request_router = self->request_router_; if (request_router->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p request=%p: pick completed asynchronously", request_router, self); } self->MaybeRemoveCallFromInterestedPartiesLocked(); if (self->pick_canceller_ != nullptr) { self->pick_canceller_->MarkFinishedLocked(); } GRPC_CLOSURE_RUN(self->on_route_done_, GRPC_ERROR_REF(error)); GRPC_CALL_STACK_UNREF(self->owning_call_, "pick_callback"); } // // RequestRouter::LbConnectivityWatcher // class RequestRouter::LbConnectivityWatcher { public: LbConnectivityWatcher(RequestRouter* request_router, grpc_connectivity_state state, LoadBalancingPolicy* lb_policy, grpc_channel_stack* owning_stack, grpc_combiner* combiner) : request_router_(request_router), state_(state), lb_policy_(lb_policy), owning_stack_(owning_stack) { GRPC_CHANNEL_STACK_REF(owning_stack_, "LbConnectivityWatcher"); GRPC_CLOSURE_INIT(&on_changed_, &OnLbPolicyStateChangedLocked, this, grpc_combiner_scheduler(combiner)); lb_policy_->NotifyOnStateChangeLocked(&state_, &on_changed_); } ~LbConnectivityWatcher() { GRPC_CHANNEL_STACK_UNREF(owning_stack_, "LbConnectivityWatcher"); } private: static void OnLbPolicyStateChangedLocked(void* arg, grpc_error* error) { LbConnectivityWatcher* self = static_cast(arg); // If the notification is not for the current policy, we're stale, // so delete ourselves. if (self->lb_policy_ != self->request_router_->lb_policy_.get()) { Delete(self); return; } // Otherwise, process notification. if (self->request_router_->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: lb_policy=%p state changed to %s", self->request_router_, self->lb_policy_, grpc_connectivity_state_name(self->state_)); } self->request_router_->SetConnectivityStateLocked( self->state_, GRPC_ERROR_REF(error), "lb_changed"); // If shutting down, terminate watch. if (self->state_ == GRPC_CHANNEL_SHUTDOWN) { Delete(self); return; } // Renew watch. self->lb_policy_->NotifyOnStateChangeLocked(&self->state_, &self->on_changed_); } RequestRouter* request_router_; grpc_connectivity_state state_; // LB policy address. No ref held, so not safe to dereference unless // it happens to match request_router->lb_policy_. LoadBalancingPolicy* lb_policy_; grpc_channel_stack* owning_stack_; grpc_closure on_changed_; }; // // RequestRounter::ReresolutionRequestHandler // class RequestRouter::ReresolutionRequestHandler { public: ReresolutionRequestHandler(RequestRouter* request_router, LoadBalancingPolicy* lb_policy, grpc_channel_stack* owning_stack, grpc_combiner* combiner) : request_router_(request_router), lb_policy_(lb_policy), owning_stack_(owning_stack) { GRPC_CHANNEL_STACK_REF(owning_stack_, "ReresolutionRequestHandler"); GRPC_CLOSURE_INIT(&closure_, &OnRequestReresolutionLocked, this, grpc_combiner_scheduler(combiner)); lb_policy_->SetReresolutionClosureLocked(&closure_); } private: static void OnRequestReresolutionLocked(void* arg, grpc_error* error) { ReresolutionRequestHandler* self = static_cast(arg); RequestRouter* request_router = self->request_router_; // If this invocation is for a stale LB policy, treat it as an LB shutdown // signal. if (self->lb_policy_ != request_router->lb_policy_.get() || error != GRPC_ERROR_NONE || request_router->resolver_ == nullptr) { GRPC_CHANNEL_STACK_UNREF(request_router->owning_stack_, "ReresolutionRequestHandler"); Delete(self); return; } if (request_router->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: started name re-resolving", request_router); } request_router->resolver_->RequestReresolutionLocked(); // Give back the closure to the LB policy. self->lb_policy_->SetReresolutionClosureLocked(&self->closure_); } RequestRouter* request_router_; // LB policy address. No ref held, so not safe to dereference unless // it happens to match request_router->lb_policy_. LoadBalancingPolicy* lb_policy_; grpc_channel_stack* owning_stack_; grpc_closure closure_; }; // // RequestRouter // RequestRouter::RequestRouter( grpc_channel_stack* owning_stack, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, grpc_pollset_set* interested_parties, TraceFlag* tracer, ProcessResolverResultCallback process_resolver_result, void* process_resolver_result_user_data, const char* target_uri, const grpc_channel_args* args, grpc_error** error) : owning_stack_(owning_stack), combiner_(combiner), client_channel_factory_(client_channel_factory), interested_parties_(interested_parties), tracer_(tracer), process_resolver_result_(process_resolver_result), process_resolver_result_user_data_(process_resolver_result_user_data) { GRPC_CLOSURE_INIT(&on_resolver_result_changed_, &RequestRouter::OnResolverResultChangedLocked, this, grpc_combiner_scheduler(combiner)); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "request_router"); grpc_channel_args* new_args = nullptr; if (process_resolver_result == nullptr) { grpc_arg arg = grpc_channel_arg_integer_create( const_cast(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0); new_args = grpc_channel_args_copy_and_add(args, &arg, 1); } resolver_ = ResolverRegistry::CreateResolver( target_uri, (new_args == nullptr ? args : new_args), interested_parties_, combiner_); grpc_channel_args_destroy(new_args); if (resolver_ == nullptr) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); } } RequestRouter::~RequestRouter() { if (resolver_ != nullptr) { // The only way we can get here is if we never started resolving, // because we take a ref to the channel stack when we start // resolving and do not release it until the resolver callback is // invoked after the resolver shuts down. resolver_.reset(); } if (lb_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), interested_parties_); lb_policy_.reset(); } if (client_channel_factory_ != nullptr) { grpc_client_channel_factory_unref(client_channel_factory_); } grpc_connectivity_state_destroy(&state_tracker_); } namespace { const char* GetChannelConnectivityStateChangeString( grpc_connectivity_state state) { switch (state) { case GRPC_CHANNEL_IDLE: return "Channel state change to IDLE"; case GRPC_CHANNEL_CONNECTING: return "Channel state change to CONNECTING"; case GRPC_CHANNEL_READY: return "Channel state change to READY"; case GRPC_CHANNEL_TRANSIENT_FAILURE: return "Channel state change to TRANSIENT_FAILURE"; case GRPC_CHANNEL_SHUTDOWN: return "Channel state change to SHUTDOWN"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } } // namespace void RequestRouter::SetConnectivityStateLocked(grpc_connectivity_state state, grpc_error* error, const char* reason) { if (lb_policy_ != nullptr) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // Cancel picks with wait_for_ready=false. lb_policy_->CancelMatchingPicksLocked( /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } else if (state == GRPC_CHANNEL_SHUTDOWN) { // Cancel all picks. lb_policy_->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, GRPC_ERROR_REF(error)); } } if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: setting connectivity state to %s", this, grpc_connectivity_state_name(state)); } if (channelz_node_ != nullptr) { channelz_node_->AddTraceEvent( channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string( GetChannelConnectivityStateChangeString(state))); } grpc_connectivity_state_set(&state_tracker_, state, error, reason); } void RequestRouter::StartResolvingLocked() { if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: starting name resolution", this); } GPR_ASSERT(!started_resolving_); started_resolving_ = true; GRPC_CHANNEL_STACK_REF(owning_stack_, "resolver"); resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_); } // Invoked from the resolver NextLocked() callback when the resolver // is shutting down. void RequestRouter::OnResolverShutdownLocked(grpc_error* error) { if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: shutting down", this); } if (lb_policy_ != nullptr) { if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, lb_policy_.get()); } grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), interested_parties_); lb_policy_.reset(); } if (resolver_ != nullptr) { // This should never happen; it can only be triggered by a resolver // implementation spotaneously deciding to report shutdown without // being orphaned. This code is included just to be defensive. if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: spontaneous shutdown from resolver %p", this, resolver_.get()); } resolver_.reset(); SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Resolver spontaneous shutdown", &error, 1), "resolver_spontaneous_shutdown"); } grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Channel disconnected", &error, 1)); GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); GRPC_CHANNEL_STACK_UNREF(owning_stack_, "resolver"); grpc_channel_args_destroy(resolver_result_); resolver_result_ = nullptr; GRPC_ERROR_UNREF(error); } // Creates a new LB policy, replacing any previous one. // If the new policy is created successfully, sets *connectivity_state and // *connectivity_error to its initial connectivity state; otherwise, // leaves them unchanged. void RequestRouter::CreateNewLbPolicyLocked( const char* lb_policy_name, grpc_json* lb_config, grpc_connectivity_state* connectivity_state, grpc_error** connectivity_error, TraceStringVector* trace_strings) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner_; lb_policy_args.client_channel_factory = client_channel_factory_; lb_policy_args.args = resolver_result_; lb_policy_args.lb_config = lb_config; OrphanablePtr new_lb_policy = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(lb_policy_name, lb_policy_args); if (GPR_UNLIKELY(new_lb_policy == nullptr)) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); if (channelz_node_ != nullptr) { char* str; gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); trace_strings->push_back(str); } } else { if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: created new LB policy \"%s\" (%p)", this, lb_policy_name, new_lb_policy.get()); } if (channelz_node_ != nullptr) { char* str; gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); trace_strings->push_back(str); } // Swap out the LB policy and update the fds in interested_parties_. if (lb_policy_ != nullptr) { if (tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, lb_policy_.get()); } grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), interested_parties_); lb_policy_->HandOffPendingPicksLocked(new_lb_policy.get()); } lb_policy_ = std::move(new_lb_policy); grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), interested_parties_); // Create re-resolution request handler for the new LB policy. It // will delete itself when no longer needed. New(this, lb_policy_.get(), owning_stack_, combiner_); // Get the new LB policy's initial connectivity state and start a // connectivity watch. GRPC_ERROR_UNREF(*connectivity_error); *connectivity_state = lb_policy_->CheckConnectivityLocked(connectivity_error); if (exit_idle_when_lb_policy_arrives_) { lb_policy_->ExitIdleLocked(); exit_idle_when_lb_policy_arrives_ = false; } // Create new watcher. It will delete itself when done. New(this, *connectivity_state, lb_policy_.get(), owning_stack_, combiner_); } } void RequestRouter::MaybeAddTraceMessagesForAddressChangesLocked( TraceStringVector* trace_strings) { const ServerAddressList* addresses = FindServerAddressListChannelArg(resolver_result_); const bool resolution_contains_addresses = addresses != nullptr && addresses->size() > 0; if (!resolution_contains_addresses && previous_resolution_contained_addresses_) { trace_strings->push_back(gpr_strdup("Address list became empty")); } else if (resolution_contains_addresses && !previous_resolution_contained_addresses_) { trace_strings->push_back(gpr_strdup("Address list became non-empty")); } previous_resolution_contained_addresses_ = resolution_contains_addresses; } void RequestRouter::ConcatenateAndAddChannelTraceLocked( TraceStringVector* trace_strings) const { if (!trace_strings->empty()) { gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); bool is_first = 1; for (size_t i = 0; i < trace_strings->size(); ++i) { if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); is_first = false; gpr_strvec_add(&v, (*trace_strings)[i]); } char* flat; size_t flat_len = 0; flat = gpr_strvec_flatten(&v, &flat_len); channelz_node_->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_new(flat, flat_len, gpr_free)); gpr_strvec_destroy(&v); } } // Callback invoked when a resolver result is available. void RequestRouter::OnResolverResultChangedLocked(void* arg, grpc_error* error) { RequestRouter* self = static_cast(arg); if (self->tracer_->enabled()) { const char* disposition = self->resolver_result_ != nullptr ? "" : (error == GRPC_ERROR_NONE ? " (transient error)" : " (resolver shutdown)"); gpr_log(GPR_INFO, "request_router=%p: got resolver result: resolver_result=%p " "error=%s%s", self, self->resolver_result_, grpc_error_string(error), disposition); } // Handle shutdown. if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) { self->OnResolverShutdownLocked(GRPC_ERROR_REF(error)); return; } // Data used to set the channel's connectivity state. bool set_connectivity_state = true; // We only want to trace the address resolution in the follow cases: // (a) Address resolution resulted in service config change. // (b) Address resolution that causes number of backends to go from // zero to non-zero. // (c) Address resolution that causes number of backends to go from // non-zero to zero. // (d) Address resolution that causes a new LB policy to be created. // // we track a list of strings to eventually be concatenated and traced. TraceStringVector trace_strings; grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* connectivity_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); // resolver_result_ will be null in the case of a transient // resolution error. In that case, we don't have any new result to // process, which means that we keep using the previous result (if any). if (self->resolver_result_ == nullptr) { if (self->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: resolver transient failure", self); } // Don't override connectivity state if we already have an LB policy. if (self->lb_policy_ != nullptr) set_connectivity_state = false; } else { // Parse the resolver result. const char* lb_policy_name = nullptr; grpc_json* lb_policy_config = nullptr; const bool service_config_changed = self->process_resolver_result_( self->process_resolver_result_user_data_, *self->resolver_result_, &lb_policy_name, &lb_policy_config); GPR_ASSERT(lb_policy_name != nullptr); // Check to see if we're already using the right LB policy. const bool lb_policy_name_changed = self->lb_policy_ == nullptr || strcmp(self->lb_policy_->name(), lb_policy_name) != 0; if (self->lb_policy_ != nullptr && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. if (self->tracer_->enabled()) { gpr_log(GPR_INFO, "request_router=%p: updating existing LB policy \"%s\" (%p)", self, lb_policy_name, self->lb_policy_.get()); } self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); // No need to set the channel's connectivity state; the existing // watch on the LB policy will take care of that. set_connectivity_state = false; } else { // Instantiate new LB policy. self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config, &connectivity_state, &connectivity_error, &trace_strings); } // Add channel trace event. if (self->channelz_node_ != nullptr) { if (service_config_changed) { // TODO(ncteisen): might be worth somehow including a snippet of the // config in the trace, at the risk of bloating the trace logs. trace_strings.push_back(gpr_strdup("Service config changed")); } self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings); self->ConcatenateAndAddChannelTraceLocked(&trace_strings); } // Clean up. grpc_channel_args_destroy(self->resolver_result_); self->resolver_result_ = nullptr; } // Set the channel's connectivity state if needed. if (set_connectivity_state) { self->SetConnectivityStateLocked(connectivity_state, connectivity_error, "resolver_result"); } else { GRPC_ERROR_UNREF(connectivity_error); } // Invoke closures that were waiting for results and renew the watch. GRPC_CLOSURE_LIST_SCHED(&self->waiting_for_resolver_result_closures_); self->resolver_->NextLocked(&self->resolver_result_, &self->on_resolver_result_changed_); } void RequestRouter::RouteCallLocked(Request* request) { GPR_ASSERT(request->pick_.connected_subchannel == nullptr); request->request_router_ = this; if (lb_policy_ != nullptr) { // We already have resolver results, so process the service config // and start an LB pick. request->ProcessServiceConfigAndStartLbPickLocked(); } else if (resolver_ == nullptr) { GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else { // We do not yet have an LB policy, so wait for a resolver result. if (!started_resolving_) { StartResolvingLocked(); } // Create a new waiter, which will delete itself when done. New(request); // Add the request's polling entity to the request_router's // interested_parties, so that the I/O of the resolver can be done // under it. It will be removed in LbPickDoneLocked(). request->MaybeAddCallToInterestedPartiesLocked(); } } void RequestRouter::ShutdownLocked(grpc_error* error) { if (resolver_ != nullptr) { SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "disconnect"); resolver_.reset(); if (!started_resolving_) { grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, GRPC_ERROR_REF(error)); GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); } if (lb_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), interested_parties_); lb_policy_.reset(); } } GRPC_ERROR_UNREF(error); } grpc_connectivity_state RequestRouter::GetConnectivityState() { return grpc_connectivity_state_check(&state_tracker_); } void RequestRouter::NotifyOnConnectivityStateChange( grpc_connectivity_state* state, grpc_closure* closure) { grpc_connectivity_state_notify_on_state_change(&state_tracker_, state, closure); } void RequestRouter::ExitIdleLocked() { if (lb_policy_ != nullptr) { lb_policy_->ExitIdleLocked(); } else { exit_idle_when_lb_policy_arrives_ = true; if (!started_resolving_ && resolver_ != nullptr) { StartResolvingLocked(); } } } void RequestRouter::ResetConnectionBackoffLocked() { if (resolver_ != nullptr) { resolver_->ResetBackoffLocked(); resolver_->RequestReresolutionLocked(); } if (lb_policy_ != nullptr) { lb_policy_->ResetBackoffLocked(); } } } // namespace grpc_core