diff options
author | Vijay Pai <vpai@google.com> | 2018-10-30 12:29:35 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-10-30 12:29:35 -0700 |
commit | 47d0d5bf1f163e9dc03a771156aa712dc0716d2b (patch) | |
tree | f2e0178f36785999a1697e80817a54b0d6dd77aa /src | |
parent | 84e763f10a1e10d36c7de35970f9d25958ee2e16 (diff) | |
parent | 53df56529bfb30214ece3ad5d90c07aa323d776b (diff) |
Merge branch 'master' into server_callback
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 34 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/health/health_check_client.cc | 6 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/server/chttp2_server.cc | 9 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.cc | 18 | ||||
-rw-r--r-- | src/core/lib/channel/channelz.h | 7 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 6 |
6 files changed, 74 insertions, 6 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index daf1b89b09..91894689c3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -2951,6 +2951,27 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (grpc_connectivity_state_check(&chand->state_tracker) == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; + } + return false; +} + // Invoked once resolver results are available. static void process_service_config_and_start_lb_pick_locked( grpc_call_element* elem) { @@ -2958,6 +2979,9 @@ static void process_service_config_and_start_lb_pick_locked( // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); + // Check this after applying service config, since it may have + // affected the call's wait_for_ready value. + if (fail_call_if_in_transient_failure(elem)) return; } // Start LB pick. grpc_core::LbPicker::StartLocked(elem); @@ -3127,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { // We do not yet have an LB policy, so wait for a resolver result. if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); + } else { + // Normally, we want to do this check in + // process_service_config_and_start_lb_pick_locked(), so that we + // can honor the wait_for_ready setting in the service config. + // However, if the channel is in TRANSIENT_FAILURE at this point, that + // means that the resolver has returned a failure, so we're not going + // to get a service config right away. In that case, we fail the + // call now based on the wait_for_ready value passed in from the + // application. + if (fail_call_if_in_transient_failure(elem)) return; } // Create a new waiter, which will delete itself when done. grpc_core::New<grpc_core::ResolverResultWaiter>(elem); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index a3c782d8c9..591637aa86 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -299,6 +299,11 @@ HealthCheckClient::CallState::~CallState() { health_check_client_.get(), this); } if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended"); + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) { + if (context_[i].destroy != nullptr) { + context_[i].destroy(context_[i].value); + } + } // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be @@ -346,6 +351,7 @@ void HealthCheckClient::CallState::StartCall() { } // Initialize payload and batch. memset(&batch_, 0, sizeof(batch_)); + payload_.context = context_; batch_.payload = &payload_; // on_complete callback takes ref, handled manually. Ref(DEBUG_LOCATION, "on_complete").release(); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 6855246fa4..287bc0454e 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -37,6 +37,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker_registry.h" +#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resource_quota.h" @@ -366,8 +367,14 @@ grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr, arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); if (grpc_channel_arg_get_bool(arg, false)) { + char* host; + char* port; + gpr_split_host_port(addr, &host, &port); + // allocated host's ownership is passed to ListenSocketNode. state->channelz_listen_socket = - grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>(); + grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>( + grpc_core::UniquePtr<char>(host), *port_num); + gpr_free(port); socket_uuid = state->channelz_listen_socket->uuid(); } diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 33577d890a..032654b861 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -374,7 +374,8 @@ grpc_json* SocketNode::RenderJson() { return top_level_json; } -ListenSocketNode::ListenSocketNode() : BaseNode(EntityType::kSocket) {} +ListenSocketNode::ListenSocketNode(UniquePtr<char> host, int port) + : BaseNode(EntityType::kSocket), host_(std::move(host)), port_(port) {} grpc_json* ListenSocketNode::RenderJson() { // We need to track these three json objects to build our object @@ -388,6 +389,21 @@ grpc_json* ListenSocketNode::RenderJson() { json_iterator = nullptr; json_iterator = grpc_json_add_number_string_child(json, json_iterator, "socketId", uuid()); + json = top_level_json; + json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "local", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "tcpip_address", + nullptr, GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = + grpc_json_add_number_string_child(json, json_iterator, "port", port_); + json_iterator = grpc_json_create_child(json_iterator, json, "ip_address", + host_.get(), GRPC_JSON_STRING, false); + return top_level_json; } diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 88551befc8..8e66623142 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -268,10 +268,15 @@ class SocketNode : public BaseNode { // Handles channelz bookkeeping for listen sockets class ListenSocketNode : public BaseNode { public: - ListenSocketNode(); + // ListenSocketNode takes ownership of host. + ListenSocketNode(UniquePtr<char> host, int port); ~ListenSocketNode() override {} grpc_json* RenderJson() override; + + private: + UniquePtr<char> host_; + int port_; }; // Creation functions diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 5d24e01a47..4ce2a46a09 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -620,7 +620,8 @@ Server::Server( std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> interceptor_creators) - : max_receive_message_size_(max_receive_message_size), + : interceptor_creators_(std::move(interceptor_creators)), + max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), @@ -628,8 +629,7 @@ Server::Server( has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)), - health_check_service_disabled_(false), - interceptor_creators_(std::move(interceptor_creators)) { + health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; |