/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/filters/client_channel/subchannel.h" #include #include #include #include #include #include #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/health/health_check_client.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/status_metadata.h" #include "src/core/lib/uri/uri_parser.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 typedef struct external_state_watcher { grpc_subchannel* subchannel; grpc_pollset_set* pollset_set; grpc_closure* notify; grpc_closure closure; struct external_state_watcher* next; struct external_state_watcher* prev; } external_state_watcher; namespace grpc_core { class ConnectedSubchannelStateWatcher; } // namespace grpc_core struct grpc_subchannel { grpc_connector* connector; /** refcount - lower INTERNAL_REF_BITS bits are for internal references: these do not keep the subchannel open. - upper remaining bits are for public references: these do keep the subchannel open */ gpr_atm ref_pair; /** channel arguments */ grpc_channel_args* args; grpc_subchannel_key* key; /** set during connection */ grpc_connect_out_args connecting_result; /** callback for connection finishing */ grpc_closure on_connected; /** callback for our alarm */ grpc_closure on_alarm; /** pollset_set tracking who's interested in a connection being setup */ grpc_pollset_set* pollset_set; grpc_core::UniquePtr health_check_service_name; /** mutex protecting remaining elements */ gpr_mu mu; /** active connection, or null */ grpc_core::RefCountedPtr connected_subchannel; grpc_core::OrphanablePtr connected_subchannel_watcher; /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ bool connecting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_and_health_tracker; external_state_watcher root_external_state_watcher; /** backoff state */ grpc_core::ManualConstructor backoff; grpc_millis next_attempt_deadline; grpc_millis min_connect_timeout_ms; /** do we have an active alarm? */ bool have_alarm; /** have we started the backoff loop */ bool backoff_begun; // reset_backoff() was called while alarm was pending bool retry_immediately; /** our alarm */ grpc_timer alarm; grpc_core::RefCountedPtr channelz_subchannel; }; struct grpc_subchannel_call { grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, const grpc_core::ConnectedSubchannel::CallArgs& args) : connection(connection), deadline(args.deadline) {} grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy = nullptr; // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready; grpc_closure* original_recv_trailing_metadata; grpc_metadata_batch* recv_trailing_metadata = nullptr; grpc_millis deadline; }; static void maybe_start_connecting_locked(grpc_subchannel* c); static const char* subchannel_connectivity_state_change_string( grpc_connectivity_state state) { switch (state) { case GRPC_CHANNEL_IDLE: return "Subchannel state change to IDLE"; case GRPC_CHANNEL_CONNECTING: return "Subchannel state change to CONNECTING"; case GRPC_CHANNEL_READY: return "Subchannel state change to READY"; case GRPC_CHANNEL_TRANSIENT_FAILURE: return "Subchannel state change to TRANSIENT_FAILURE"; case GRPC_CHANNEL_SHUTDOWN: return "Subchannel state change to SHUTDOWN"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); } static void set_subchannel_connectivity_state_locked( grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error, const char* reason) { if (c->channelz_subchannel != nullptr) { c->channelz_subchannel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string( subchannel_connectivity_state_change_string(state))); } grpc_connectivity_state_set(&c->state_tracker, state, error, reason); } namespace grpc_core { class ConnectedSubchannelStateWatcher : public InternallyRefCounted { public: // Must be instantiated while holding c->mu. explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c) : subchannel_(c) { // Steal subchannel ref for connecting. GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); // Start watching for connectivity state changes. // Callback uses initial ref to this. GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, grpc_schedule_on_exec_ctx); c->connected_subchannel->NotifyOnStateChange(c->pollset_set, &pending_connectivity_state_, &on_connectivity_changed_); // Start health check if needed. grpc_connectivity_state health_state = GRPC_CHANNEL_READY; if (c->health_check_service_name != nullptr) { health_check_client_ = grpc_core::MakeOrphanable( c->health_check_service_name.get(), c->connected_subchannel, c->pollset_set, c->channelz_subchannel); GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, grpc_schedule_on_exec_ctx); Ref().release(); // Ref for health callback tracked manually. health_check_client_->NotifyOnHealthChange(&health_state_, &on_health_changed_); health_state = GRPC_CHANNEL_CONNECTING; } // Report initial state. set_subchannel_connectivity_state_locked( c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected"); grpc_connectivity_state_set(&c->state_and_health_tracker, health_state, GRPC_ERROR_NONE, "subchannel_connected"); } ~ConnectedSubchannelStateWatcher() { GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); } void Orphan() override { health_check_client_.reset(); } private: static void OnConnectivityChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); grpc_subchannel* c = self->subchannel_; { MutexLock lock(&c->mu); switch (self->pending_connectivity_state_) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: { if (!c->disconnected && c->connected_subchannel != nullptr) { if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, "Connected subchannel %p of subchannel %p has gone into " "%s. Attempting to reconnect.", c->connected_subchannel.get(), c, grpc_connectivity_state_name( self->pending_connectivity_state_)); } c->connected_subchannel.reset(); c->connected_subchannel_watcher.reset(); self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; set_subchannel_connectivity_state_locked( c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); grpc_connectivity_state_set(&c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); c->backoff_begun = false; c->backoff->Reset(); maybe_start_connecting_locked(c); } else { self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; } self->health_check_client_.reset(); break; } default: { // In principle, this should never happen. We should not get // a callback for READY, because that was the state we started // this watch from. And a connected subchannel should never go // from READY to CONNECTING or IDLE. self->last_connectivity_state_ = self->pending_connectivity_state_; set_subchannel_connectivity_state_locked( c, self->pending_connectivity_state_, GRPC_ERROR_REF(error), "reflect_child"); if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { grpc_connectivity_state_set(&c->state_and_health_tracker, self->pending_connectivity_state_, GRPC_ERROR_REF(error), "reflect_child"); } c->connected_subchannel->NotifyOnStateChange( nullptr, &self->pending_connectivity_state_, &self->on_connectivity_changed_); self = nullptr; // So we don't unref below. } } } // Don't unref until we've released the lock, because this might // cause the subchannel (which contains the lock) to be destroyed. if (self != nullptr) self->Unref(); } static void OnHealthChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) { self->Unref(); return; } grpc_subchannel* c = self->subchannel_; MutexLock lock(&c->mu); if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { grpc_connectivity_state_set(&c->state_and_health_tracker, self->health_state_, GRPC_ERROR_REF(error), "health_changed"); } self->health_check_client_->NotifyOnHealthChange(&self->health_state_, &self->on_health_changed_); } grpc_subchannel* subchannel_; grpc_closure on_connectivity_changed_; grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; grpc_core::OrphanablePtr health_check_client_; grpc_closure on_health_changed_; grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; }; } // namespace grpc_core #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ sizeof(grpc_subchannel_call))) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (grpc_subchannel_call*)(((char*)(call_stack)) - \ GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ sizeof(grpc_subchannel_call))) static void on_subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason #define REF_MUTATE_EXTRA_ARGS \ GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose #define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else #define REF_REASON "" #define REF_MUTATE_EXTRA_ARGS #define REF_MUTATE_PURPOSE(x) #endif /* * connection implementation */ static void connection_destroy(void* arg, grpc_error* error) { grpc_channel_stack* stk = static_cast(arg); grpc_channel_stack_destroy(stk); gpr_free(stk); } /* * grpc_subchannel implementation */ static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast(arg); if (c->channelz_subchannel != nullptr) { c->channelz_subchannel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Subchannel destroyed")); c->channelz_subchannel->MarkSubchannelDestroyed(); c->channelz_subchannel.reset(); } c->health_check_service_name.reset(); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); grpc_connectivity_state_destroy(&c->state_and_health_tracker); grpc_connector_unref(c->connector); grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(c->key); gpr_mu_destroy(&c->mu); gpr_free(c); } static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifndef NDEBUG if (grpc_trace_stream_refcount.enabled()) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, purpose, old_val, old_val + delta, reason); } #endif return old_val; } grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); return c; } grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); return c; } grpc_subchannel* grpc_subchannel_ref_from_weak_ref( grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (!c) return nullptr; for (;;) { gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); if (old_refs >= (1 << INTERNAL_REF_BITS)) { gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { return c; } } else { return nullptr; } } } static void disconnect(grpc_subchannel* c) { grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); c->connected_subchannel.reset(); c->connected_subchannel_watcher.reset(); gpr_mu_unlock(&c->mu); } void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; // add a weak ref and subtract a strong ref (atomically) old_refs = ref_mutate( c, static_cast(1) - static_cast(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(c); } GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref"); } void grpc_subchannel_weak_unref( grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -static_cast(1), 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } } static void parse_args_for_backoff_values( const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, grpc_millis* min_connect_timeout_ms) { grpc_millis initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; *min_connect_timeout_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; grpc_millis max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (args != nullptr) { for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = grpc_channel_arg_get_integer( &args->args[i], {static_cast(initial_backoff_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; *min_connect_timeout_ms = grpc_channel_arg_get_integer( &args->args[i], {static_cast(*min_connect_timeout_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; max_backoff_ms = grpc_channel_arg_get_integer( &args->args[i], {static_cast(max_backoff_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; initial_backoff_ms = grpc_channel_arg_get_integer( &args->args[i], {static_cast(initial_backoff_ms), 100, INT_MAX}); } } } backoff_options->set_initial_backoff(initial_backoff_ms) .set_multiplier(fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER) .set_max_backoff(max_backoff_ms); } namespace grpc_core { namespace { struct HealthCheckParams { UniquePtr service_name; static void Parse(const grpc_json* field, HealthCheckParams* params) { if (strcmp(field->key, "healthCheckConfig") == 0) { if (field->type != GRPC_JSON_OBJECT) return; for (grpc_json* sub_field = field->child; sub_field != nullptr; sub_field = sub_field->next) { if (sub_field->key == nullptr) return; if (strcmp(sub_field->key, "serviceName") == 0) { if (params->service_name != nullptr) return; // Duplicate. if (sub_field->type != GRPC_JSON_STRING) return; params->service_name.reset(gpr_strdup(sub_field->value)); } } } } }; } // namespace } // namespace grpc_core grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); grpc_subchannel* c = grpc_subchannel_index_find(key); if (c) { grpc_subchannel_key_destroy(key); return c; } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); c = static_cast(gpr_zalloc(sizeof(*c))); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast(gpr_malloc(sizeof(*addr))); grpc_get_subchannel_address_arg(args->args, addr); grpc_resolved_address* new_address = nullptr; grpc_channel_args* new_args = nullptr; if (grpc_proxy_mappers_map_address(addr, args->args, &new_address, &new_args)) { GPR_ASSERT(new_address != nullptr); gpr_free(addr); addr = new_address; } static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); c->args = grpc_channel_args_copy_and_add_and_remove( new_args != nullptr ? new_args : args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); gpr_free(new_arg.value.string); if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, "subchannel"); grpc_core::BackOff::Options backoff_options; parse_args_for_backoff_values(args->args, &backoff_options, &c->min_connect_timeout_ms); c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); // Check whether we should enable health checking. const char* service_config_json = grpc_channel_arg_get_string( grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG)); if (service_config_json != nullptr) { grpc_core::UniquePtr service_config = grpc_core::ServiceConfig::Create(service_config_json); if (service_config != nullptr) { grpc_core::HealthCheckParams params; service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse, ¶ms); c->health_check_service_name = std::move(params.service_name); } } const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); bool channelz_enabled = grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); arg = grpc_channel_args_find( c->args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; size_t channel_tracer_max_memory = (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = grpc_core::MakeRefCounted( c, channel_tracer_max_memory); c->channelz_subchannel->AddTraceEvent( grpc_core::channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); } grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( grpc_subchannel* subchannel) { return subchannel->channelz_subchannel.get(); } intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) { if (subchannel->connected_subchannel != nullptr) { return subchannel->connected_subchannel->socket_uuid(); } else { return 0; } } static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; const grpc_millis min_deadline = c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); c->next_attempt_deadline = c->backoff->NextAttemptTime(); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "connecting"); grpc_connectivity_state_set(&c->state_and_health_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "connecting"); grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->on_connected); } grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) { gpr_mu_lock(&c->mu); grpc_connectivity_state_tracker* tracker = inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); gpr_mu_unlock(&c->mu); return state; } static void on_external_state_watcher_done(void* arg, grpc_error* error) { external_state_watcher* w = static_cast(arg); grpc_closure* follow_up = w->notify; if (w->pollset_set != nullptr) { grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); w->next->prev = w->prev; w->prev->next = w->next; gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast(arg); gpr_mu_lock(&c->mu); c->have_alarm = false; if (c->disconnected) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", &error, 1); } else if (c->retry_immediately) { c->retry_immediately = false; error = GRPC_ERROR_NONE; } else { GRPC_ERROR_REF(error); } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); continue_connect_locked(c); gpr_mu_unlock(&c->mu); } else { gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } GRPC_ERROR_UNREF(error); } static void maybe_start_connecting_locked(grpc_subchannel* c) { if (c->disconnected) { /* Don't try to connect if we're already disconnected */ return; } if (c->connecting) { /* Already connecting: don't restart */ return; } if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } if (!grpc_connectivity_state_has_watchers(&c->state_tracker) && !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) { /* Nobody is interested in connecting: so don't just yet */ return; } c->connecting = true; GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); if (!c->backoff_begun) { c->backoff_begun = true; continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); } else { gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c, time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } void grpc_subchannel_notify_on_state_change( grpc_subchannel* c, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify, bool inhibit_health_checks) { grpc_connectivity_state_tracker* tracker = inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; external_state_watcher* w; if (state == nullptr) { gpr_mu_lock(&c->mu); for (w = c->root_external_state_watcher.next; w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { grpc_connectivity_state_notify_on_state_change(tracker, nullptr, &w->closure); } } gpr_mu_unlock(&c->mu); } else { w = static_cast(gpr_malloc(sizeof(*w))); w->subchannel = c; w->pollset_set = interested_parties; w->notify = notify; GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, grpc_schedule_on_exec_ctx); if (interested_parties != nullptr) { grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure); maybe_start_connecting_locked(c); gpr_mu_unlock(&c->mu); } } static bool publish_transport_locked(grpc_subchannel* c) { /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, c->connecting_result.transport); if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { grpc_channel_stack_builder_destroy(builder); return false; } grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( builder, 0, 1, connection_destroy, nullptr, reinterpret_cast(&stk)); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", grpc_error_string(error)); GRPC_ERROR_UNREF(error); return false; } intptr_t socket_uuid = c->connecting_result.socket_uuid; memset(&c->connecting_result, 0, sizeof(c->connecting_result)); if (c->disconnected) { grpc_channel_stack_destroy(stk); gpr_free(stk); return false; } /* publish */ c->connected_subchannel.reset(grpc_core::New( stk, c->args, c->channelz_subchannel, socket_uuid)); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); // Instantiate state watcher. Will clean itself up. c->connected_subchannel_watcher = grpc_core::MakeOrphanable(c); return true; } static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast(arg); grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && publish_transport_locked(c)) { /* do nothing, transport was published */ } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } else { set_subchannel_connectivity_state_locked( c, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); grpc_connectivity_state_set( &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); const char* errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); maybe_start_connecting_locked(c); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected"); grpc_channel_args_destroy(delete_channel_args); } void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { gpr_mu_lock(&subchannel->mu); subchannel->backoff->Reset(); if (subchannel->have_alarm) { subchannel->retry_immediately = true; grpc_timer_cancel(&subchannel->alarm); } else { subchannel->backoff_begun = false; maybe_start_connecting_locked(subchannel); } gpr_mu_unlock(&subchannel->mu); } /* * grpc_subchannel_call implementation */ static void subchannel_call_destroy(void* call, grpc_error* error) { GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); grpc_subchannel_call* c = static_cast(call); grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); connection->Unref(DEBUG_LOCATION, "subchannel_call"); c->~grpc_subchannel_call(); } void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, grpc_closure* closure) { GPR_ASSERT(call->schedule_closure_after_destroy == nullptr); GPR_ASSERT(closure != nullptr); call->schedule_closure_after_destroy = closure; } grpc_subchannel_call* grpc_subchannel_call_ref( grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); return c; } void grpc_subchannel_call_unref( grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } // Sets *status based on md_batch and error. static void get_call_status(grpc_subchannel_call* call, grpc_metadata_batch* md_batch, grpc_error* error, grpc_status_code* status) { if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, nullptr); } else { if (md_batch->idx.named.grpc_status != nullptr) { *status = grpc_get_status_code_from_metadata( md_batch->idx.named.grpc_status->md); } else { *status = GRPC_STATUS_UNKNOWN; } } GRPC_ERROR_UNREF(error); } static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { grpc_subchannel_call* call = static_cast(arg); GPR_ASSERT(call->recv_trailing_metadata != nullptr); grpc_status_code status = GRPC_STATUS_OK; grpc_metadata_batch* md_batch = call->recv_trailing_metadata; get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); grpc_core::channelz::SubchannelNode* channelz_subchannel = call->connection->channelz_subchannel(); GPR_ASSERT(channelz_subchannel != nullptr); if (status == GRPC_STATUS_OK) { channelz_subchannel->RecordCallSucceeded(); } else { channelz_subchannel->RecordCallFailed(); } GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, GRPC_ERROR_REF(error)); } // If channelz is enabled, intercept recv_trailing so that we may check the // status and associate it to a subchannel. static void maybe_intercept_recv_trailing_metadata( grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { // only intercept payloads with recv trailing. if (!batch->recv_trailing_metadata) { return; } // only add interceptor is channelz is enabled. if (call->connection->channelz_subchannel() == nullptr) { return; } GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready, recv_trailing_metadata_ready, call, grpc_schedule_on_exec_ctx); // save some state needed for the interception callback. GPR_ASSERT(call->recv_trailing_metadata == nullptr); call->recv_trailing_metadata = batch->payload->recv_trailing_metadata.recv_trailing_metadata; call->original_recv_trailing_metadata = batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = &call->recv_trailing_metadata_ready; } void grpc_subchannel_call_process_op(grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); maybe_intercept_recv_trailing_metadata(call, batch); grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); top_elem->filter->start_transport_stream_op_batch(top_elem, batch); } grpc_core::RefCountedPtr grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { gpr_mu_lock(&c->mu); auto copy = c->connected_subchannel; gpr_mu_unlock(&c->mu); return copy; } const grpc_subchannel_key* grpc_subchannel_get_key( const grpc_subchannel* subchannel) { return subchannel->key; } void* grpc_connected_subchannel_call_get_parent_data( grpc_subchannel_call* subchannel_call) { grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack(); return (char*)subchannel_call + sizeof(grpc_subchannel_call) + chanstk->call_stack_size; } grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } static void grpc_uri_to_sockaddr(const char* uri_str, grpc_resolved_address* addr) { grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != nullptr); if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); grpc_uri_destroy(uri); } void grpc_get_subchannel_address_arg(const grpc_channel_args* args, grpc_resolved_address* addr) { const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { grpc_uri_to_sockaddr(addr_uri_str, addr); } } const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { const grpc_arg* addr_arg = grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); const char* addr_str = grpc_channel_arg_get_string(addr_arg); GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. return addr_str; } const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); const char* addr_str = grpc_channel_arg_get_string(addr_arg); GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. return addr_str; } grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { return grpc_channel_arg_string_create( (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } namespace grpc_core { ConnectedSubchannel::ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, grpc_core::RefCountedPtr channelz_subchannel, intptr_t socket_uuid) : RefCounted(&grpc_trace_stream_refcount), channel_stack_(channel_stack), args_(grpc_channel_args_copy(args)), channelz_subchannel_(std::move(channelz_subchannel)), socket_uuid_(socket_uuid) {} ConnectedSubchannel::~ConnectedSubchannel() { grpc_channel_args_destroy(args_); GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } void ConnectedSubchannel::NotifyOnStateChange( grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* closure) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; op->connectivity_state = state; op->on_connectivity_state_change = closure; op->bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(channel_stack_, 0); elem->filter->start_transport_op(elem, op); } void ConnectedSubchannel::Ping(grpc_closure* on_initiate, grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; op->send_ping.on_initiate = on_initiate; op->send_ping.on_ack = on_ack; elem = grpc_channel_stack_element(channel_stack_, 0); elem->filter->start_transport_op(elem, op); } grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, grpc_subchannel_call** call) { const size_t allocation_size = GetInitialCallSizeEstimate(args.parent_data_size); *call = new (gpr_arena_alloc(args.arena, allocation_size)) grpc_subchannel_call(this, args); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); RefCountedPtr connection = Ref(DEBUG_LOCATION, "subchannel_call"); connection.release(); // Ref is passed to the grpc_subchannel_call object. const grpc_call_element_args call_args = { callstk, /* call_stack */ nullptr, /* server_transport_data */ args.context, /* context */ args.path, /* path */ args.start_time, /* start_time */ args.deadline, /* deadline */ args.arena, /* arena */ args.call_combiner /* call_combiner */ }; grpc_error* error = grpc_call_stack_init( channel_stack_, 1, subchannel_call_destroy, *call, &call_args); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { const char* error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); return error; } grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); if (channelz_subchannel_ != nullptr) { channelz_subchannel_->RecordCallStarted(); } return GRPC_ERROR_NONE; } size_t ConnectedSubchannel::GetInitialCallSizeEstimate( size_t parent_data_size) const { size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); if (parent_data_size > 0) { allocation_size += GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + parent_data_size; } else { allocation_size += channel_stack_->call_stack_size; } return allocation_size; } } // namespace grpc_core