diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 510 |
1 files changed, 403 insertions, 107 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0e40f42e18..a56db0201b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -30,10 +30,10 @@ #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/health/health_check_client.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" -#include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -41,6 +41,7 @@ #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" @@ -49,6 +50,10 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/service_config.h" +#include "src/core/lib/transport/status_metadata.h" +#include "src/core/lib/uri/uri_parser.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) @@ -64,6 +69,10 @@ struct state_watcher { grpc_closure closure; grpc_subchannel* subchannel; grpc_connectivity_state connectivity_state; + grpc_connectivity_state last_connectivity_state; + grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client; + grpc_closure health_check_closure; + grpc_connectivity_state health_state; }; } // namespace @@ -76,6 +85,12 @@ typedef struct external_state_watcher { struct external_state_watcher* prev; } external_state_watcher; +namespace grpc_core { + +class ConnectedSubchannelStateWatcher; + +} // namespace grpc_core + struct grpc_subchannel { grpc_connector* connector; @@ -107,19 +122,24 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; + grpc_core::UniquePtr<char> health_check_service_name; + /** mutex protecting remaining elements */ gpr_mu mu; - /** active connection, or null; of type grpc_core::ConnectedSubchannel - */ + /** active connection, or null */ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; + grpc_core::OrphanablePtr<grpc_core::ConnectedSubchannelStateWatcher> + connected_subchannel_watcher; /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ bool connecting; + /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; + grpc_connectivity_state_tracker state_and_health_tracker; external_state_watcher root_external_state_watcher; @@ -142,10 +162,184 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { + grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, + const grpc_core::ConnectedSubchannel::CallArgs& args) + : connection(connection), deadline(args.deadline) {} + grpc_core::ConnectedSubchannel* connection; - grpc_closure* schedule_closure_after_destroy; + grpc_closure* schedule_closure_after_destroy = nullptr; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata = nullptr; + grpc_millis deadline; }; +static void maybe_start_connecting_locked(grpc_subchannel* c); + +static const char* subchannel_connectivity_state_change_string( + grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "Subchannel state change to IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "Subchannel state change to CONNECTING"; + case GRPC_CHANNEL_READY: + return "Subchannel state change to READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "Subchannel state change to TRANSIENT_FAILURE"; + case GRPC_CHANNEL_SHUTDOWN: + return "Subchannel state change to SHUTDOWN"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static void set_subchannel_connectivity_state_locked( + grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error, + const char* reason) { + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + subchannel_connectivity_state_change_string(state))); + } + grpc_connectivity_state_set(&c->state_tracker, state, error, reason); +} + +namespace grpc_core { + +class ConnectedSubchannelStateWatcher + : public InternallyRefCounted<ConnectedSubchannelStateWatcher> { + public: + // Must be instantiated while holding c->mu. + explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c) + : subchannel_(c) { + // Steal subchannel ref for connecting. + GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); + GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); + // Start watching for connectivity state changes. + // Callback uses initial ref to this. + GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, + grpc_schedule_on_exec_ctx); + c->connected_subchannel->NotifyOnStateChange(c->pollset_set, + &pending_connectivity_state_, + &on_connectivity_changed_); + // Start health check if needed. + grpc_connectivity_state health_state = GRPC_CHANNEL_READY; + if (c->health_check_service_name != nullptr) { + health_check_client_ = grpc_core::MakeOrphanable<HealthCheckClient>( + c->health_check_service_name.get(), c->connected_subchannel, + c->pollset_set, c->channelz_subchannel); + GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, + grpc_schedule_on_exec_ctx); + Ref().release(); // Ref for health callback tracked manually. + health_check_client_->NotifyOnHealthChange(&health_state_, + &on_health_changed_); + health_state = GRPC_CHANNEL_CONNECTING; + } + // Report initial state. + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected"); + grpc_connectivity_state_set(&c->state_and_health_tracker, health_state, + GRPC_ERROR_NONE, "subchannel_connected"); + } + + ~ConnectedSubchannelStateWatcher() { + GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); + } + + void Orphan() override { health_check_client_.reset(); } + + private: + static void OnConnectivityChanged(void* arg, grpc_error* error) { + auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg); + grpc_subchannel* c = self->subchannel_; + { + MutexLock lock(&c->mu); + switch (self->pending_connectivity_state_) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected && c->connected_subchannel != nullptr) { + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into " + "%s. Attempting to reconnect.", + c->connected_subchannel.get(), c, + grpc_connectivity_state_name( + self->pending_connectivity_state_)); + } + c->connected_subchannel.reset(); + c->connected_subchannel_watcher.reset(); + self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), + "reflect_child"); + grpc_connectivity_state_set(&c->state_and_health_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); + } else { + self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; + } + self->health_check_client_.reset(); + break; + } + default: { + // In principle, this should never happen. We should not get + // a callback for READY, because that was the state we started + // this watch from. And a connected subchannel should never go + // from READY to CONNECTING or IDLE. + self->last_connectivity_state_ = self->pending_connectivity_state_; + set_subchannel_connectivity_state_locked( + c, self->pending_connectivity_state_, GRPC_ERROR_REF(error), + "reflect_child"); + if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { + grpc_connectivity_state_set(&c->state_and_health_tracker, + self->pending_connectivity_state_, + GRPC_ERROR_REF(error), "reflect_child"); + } + c->connected_subchannel->NotifyOnStateChange( + nullptr, &self->pending_connectivity_state_, + &self->on_connectivity_changed_); + self = nullptr; // So we don't unref below. + } + } + } + // Don't unref until we've released the lock, because this might + // cause the subchannel (which contains the lock) to be destroyed. + if (self != nullptr) self->Unref(); + } + + static void OnHealthChanged(void* arg, grpc_error* error) { + auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg); + if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) { + self->Unref(); + return; + } + grpc_subchannel* c = self->subchannel_; + MutexLock lock(&c->mu); + if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { + grpc_connectivity_state_set(&c->state_and_health_tracker, + self->health_state_, GRPC_ERROR_REF(error), + "health_changed"); + } + self->health_check_client_->NotifyOnHealthChange(&self->health_state_, + &self->on_health_changed_); + } + + grpc_subchannel* subchannel_; + grpc_closure on_connectivity_changed_; + grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; + grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; + grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client_; + grpc_closure on_health_changed_; + grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; +}; + +} // namespace grpc_core + #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ sizeof(grpc_subchannel_call))) @@ -183,10 +377,18 @@ static void connection_destroy(void* arg, grpc_error* error) { static void subchannel_destroy(void* arg, grpc_error* error) { grpc_subchannel* c = static_cast<grpc_subchannel*>(arg); - c->channelz_subchannel.reset(); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); + } gpr_free((void*)c->filters); + c->health_check_service_name.reset(); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); + grpc_connectivity_state_destroy(&c->state_and_health_tracker); grpc_connector_unref(c->connector); grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(c->key); @@ -249,6 +451,7 @@ static void disconnect(grpc_subchannel* c) { grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); c->connected_subchannel.reset(); + c->connected_subchannel_watcher.reset(); gpr_mu_unlock(&c->mu); } @@ -324,6 +527,31 @@ static void parse_args_for_backoff_values( .set_max_backoff(max_backoff_ms); } +namespace grpc_core { +namespace { + +struct HealthCheckParams { + UniquePtr<char> service_name; + + static void Parse(const grpc_json* field, HealthCheckParams* params) { + if (strcmp(field->key, "healthCheckConfig") == 0) { + if (field->type != GRPC_JSON_OBJECT) return; + for (grpc_json* sub_field = field->child; sub_field != nullptr; + sub_field = sub_field->next) { + if (sub_field->key == nullptr) return; + if (strcmp(sub_field->key, "serviceName") == 0) { + if (params->service_name != nullptr) return; // Duplicate. + if (sub_field->type != GRPC_JSON_STRING) return; + params->service_name.reset(gpr_strdup(sub_field->value)); + } + } + } + } +}; + +} // namespace +} // namespace grpc_core + grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); @@ -374,18 +602,45 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, + "subchannel"); grpc_core::BackOff::Options backoff_options; parse_args_for_backoff_values(args->args, &backoff_options, &c->min_connect_timeout_ms); c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); + // Check whether we should enable health checking. + const char* service_config_json = grpc_channel_arg_get_string( + grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG)); + if (service_config_json != nullptr) { + grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = + grpc_core::ServiceConfig::Create(service_config_json); + if (service_config != nullptr) { + grpc_core::HealthCheckParams params; + service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse, + ¶ms); + c->health_check_service_name = std::move(params.service_name); + } + } + const grpc_arg* arg = grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); - bool channelz_enabled = grpc_channel_arg_get_bool(arg, false); + bool channelz_enabled = + grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); + arg = grpc_channel_args_find( + c->args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); + const grpc_integer_options options = { + GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; + size_t channel_tracer_max_memory = + (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { c->channelz_subchannel = - grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>( + c, channel_tracer_max_memory); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } return grpc_subchannel_index_register(key, c); @@ -396,6 +651,14 @@ grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( return subchannel->channelz_subchannel.get(); } +intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) { + if (subchannel->connected_subchannel != nullptr) { + return subchannel->connected_subchannel->socket_uuid(); + } else { + return 0; + } +} + static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -404,17 +667,21 @@ static void continue_connect_locked(grpc_subchannel* c) { c->next_attempt_deadline = c->backoff->NextAttemptTime(); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_NONE, "connecting"); + set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "connecting"); + grpc_connectivity_state_set(&c->state_and_health_tracker, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "connecting"); grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->on_connected); } -grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, - grpc_error** error) { - grpc_connectivity_state state; +grpc_connectivity_state grpc_subchannel_check_connectivity( + grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) { gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_get(&c->state_tracker, error); + grpc_connectivity_state_tracker* tracker = + inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; + grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); gpr_mu_unlock(&c->mu); return state; } @@ -472,7 +739,8 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { /* Already connected: don't restart */ return; } - if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { + if (!grpc_connectivity_state_has_watchers(&c->state_tracker) && + !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) { /* Nobody is interested in connecting: so don't just yet */ return; } @@ -499,16 +767,18 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { void grpc_subchannel_notify_on_state_change( grpc_subchannel* c, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify) { + grpc_connectivity_state* state, grpc_closure* notify, + bool inhibit_health_checks) { + grpc_connectivity_state_tracker* tracker = + inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; external_state_watcher* w; - if (state == nullptr) { gpr_mu_lock(&c->mu); for (w = c->root_external_state_watcher.next; w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change(&c->state_tracker, - nullptr, &w->closure); + grpc_connectivity_state_notify_on_state_change(tracker, nullptr, + &w->closure); } } gpr_mu_unlock(&c->mu); @@ -527,62 +797,12 @@ void grpc_subchannel_notify_on_state_change( w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; - grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - &w->closure); + grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure); maybe_start_connecting_locked(c); gpr_mu_unlock(&c->mu); } } -static void on_connected_subchannel_connectivity_changed(void* p, - grpc_error* error) { - state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(p); - grpc_subchannel* c = connected_subchannel_watcher->subchannel; - gpr_mu* mu = &c->mu; - - gpr_mu_lock(mu); - - switch (connected_subchannel_watcher->connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: - case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected && c->connected_subchannel != nullptr) { - if (grpc_trace_stream_refcount.enabled()) { - gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into %s. " - "Attempting to reconnect.", - c->connected_subchannel.get(), c, - grpc_connectivity_state_name( - connected_subchannel_watcher->connectivity_state)); - } - c->connected_subchannel.reset(); - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "reflect_child"); - c->backoff_begun = false; - c->backoff->Reset(); - maybe_start_connecting_locked(c); - } else { - connected_subchannel_watcher->connectivity_state = - GRPC_CHANNEL_SHUTDOWN; - } - break; - } - default: { - grpc_connectivity_state_set( - &c->state_tracker, connected_subchannel_watcher->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - c->connected_subchannel->NotifyOnStateChange( - nullptr, &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); - connected_subchannel_watcher = nullptr; - } - } - gpr_mu_unlock(mu); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(connected_subchannel_watcher); -} - static bool publish_transport_locked(grpc_subchannel* c) { /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); @@ -606,41 +826,25 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } + intptr_t socket_uuid = c->connecting_result.socket_uuid; memset(&c->connecting_result, 0, sizeof(c->connecting_result)); - /* initialize state watcher */ - state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>( - gpr_zalloc(sizeof(*connected_subchannel_watcher))); - connected_subchannel_watcher->subchannel = c; - connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, - on_connected_subchannel_connectivity_changed, - connected_subchannel_watcher, grpc_schedule_on_exec_ctx); - if (c->disconnected) { - gpr_free(connected_subchannel_watcher); grpc_channel_stack_destroy(stk); gpr_free(stk); return false; } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel, socket_uuid)); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); - /* setup subchannel watching connected subchannel for changes; subchannel - ref for connecting is donated to the state watcher */ - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - c->connected_subchannel->NotifyOnStateChange( - c->pollset_set, &connected_subchannel_watcher->connectivity_state, - &connected_subchannel_watcher->closure); - - /* signal completion */ - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, - GRPC_ERROR_NONE, "connected"); + // Instantiate state watcher. Will clean itself up. + c->connected_subchannel_watcher = + grpc_core::MakeOrphanable<grpc_core::ConnectedSubchannelStateWatcher>(c); + return true; } @@ -657,8 +861,14 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } else { + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + "connect_failed"); grpc_connectivity_state_set( - &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), @@ -699,6 +909,7 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); connection->Unref(DEBUG_LOCATION, "subchannel_call"); + c->~grpc_subchannel_call(); } void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, @@ -719,9 +930,71 @@ void grpc_subchannel_call_unref( GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } +// Sets *status based on md_batch and error. +static void get_call_status(grpc_subchannel_call* call, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status) { + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, + nullptr); + } else { + if (md_batch->idx.named.grpc_status != nullptr) { + *status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); + } else { + *status = GRPC_STATUS_UNKNOWN; + } + } + GRPC_ERROR_UNREF(error); +} + +static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { + grpc_subchannel_call* call = static_cast<grpc_subchannel_call*>(arg); + GPR_ASSERT(call->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = call->recv_trailing_metadata; + get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + call->connection->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, + GRPC_ERROR_REF(error)); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +static void maybe_intercept_recv_trailing_metadata( + grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (call->connection->channelz_subchannel() == nullptr) { + return; + } + GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, call, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(call->recv_trailing_metadata == nullptr); + call->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + call->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->recv_trailing_metadata_ready; +} + void grpc_subchannel_call_process_op(grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); + maybe_intercept_recv_trailing_metadata(call, batch); grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); @@ -770,6 +1043,14 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, } } +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); @@ -786,9 +1067,15 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> + channelz_subchannel, + intptr_t socket_uuid) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(std::move(channelz_subchannel)), + socket_uuid_(socket_uuid) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); @@ -818,22 +1105,14 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate, grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, grpc_subchannel_call** call) { - size_t allocation_size = - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); - if (args.parent_data_size > 0) { - allocation_size += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + - args.parent_data_size; - } else { - allocation_size += channel_stack_->call_stack_size; - } - *call = static_cast<grpc_subchannel_call*>( - gpr_arena_alloc(args.arena, allocation_size)); + const size_t allocation_size = + GetInitialCallSizeEstimate(args.parent_data_size); + *call = new (gpr_arena_alloc(args.arena, allocation_size)) + grpc_subchannel_call(this, args); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); RefCountedPtr<ConnectedSubchannel> connection = Ref(DEBUG_LOCATION, "subchannel_call"); connection.release(); // Ref is passed to the grpc_subchannel_call object. - (*call)->connection = this; const grpc_call_element_args call_args = { callstk, /* call_stack */ nullptr, /* server_transport_data */ @@ -852,7 +1131,24 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, return error; } grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + if (channelz_subchannel_ != nullptr) { + channelz_subchannel_->RecordCallStarted(); + } return GRPC_ERROR_NONE; } +size_t ConnectedSubchannel::GetInitialCallSizeEstimate( + size_t parent_data_size) const { + size_t allocation_size = + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); + if (parent_data_size > 0) { + allocation_size += + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + + parent_data_size; + } else { + allocation_size += channel_stack_->call_stack_size; + } + return allocation_size; +} + } // namespace grpc_core |