aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/OWNERS2
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.h1
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc659
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.cc29
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc70
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h36
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc365
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc741
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc253
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h596
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.h1
-rw-r--r--src/core/ext/filters/client_channel/method_params.cc2
-rw-r--r--src/core/ext/filters/client_channel/method_params.h6
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc28
-rw-r--r--src/core/ext/filters/client_channel/resolver.h8
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc51
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h1
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc58
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h7
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc30
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc30
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h5
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.cc222
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.h79
-rw-r--r--src/core/ext/filters/client_channel/status_util.cc100
-rw-r--r--src/core/ext/filters/client_channel/status_util.h58
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc1
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h1
33 files changed, 1854 insertions, 1599 deletions
diff --git a/src/core/ext/filters/client_channel/OWNERS b/src/core/ext/filters/client_channel/OWNERS
index 8f5e92808e..c8760d947b 100644
--- a/src/core/ext/filters/client_channel/OWNERS
+++ b/src/core/ext/filters/client_channel/OWNERS
@@ -1,4 +1,4 @@
set noparent
@markdroth
@dgquintas
-@a11r
+@AspirinSJL
diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h
index 7285b9b93e..8f132f968c 100644
--- a/src/core/ext/filters/client_channel/backup_poller.h
+++ b/src/core/ext/filters/client_channel/backup_poller.h
@@ -23,7 +23,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
/* Start polling \a interested_parties periodically in the timer thread */
void grpc_client_channel_start_backup_polling(
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 90b93fbe23..80a647fa94 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -38,12 +38,12 @@
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
-#include "src/core/ext/filters/client_channel/status_util.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
@@ -63,6 +63,7 @@
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParams;
+using grpc_core::internal::ServerRetryThrottleData;
/* Client channel implementation */
@@ -99,7 +100,7 @@ typedef struct client_channel_channel_data {
/** currently active load balancer */
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
/** retry throttle data */
- grpc_server_retry_throttle_data* retry_throttle_data;
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** maps method names to method_parameters structs */
grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -173,7 +174,7 @@ static void set_channel_connectivity_state_locked(channel_data* chand,
}
}
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
+ gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
grpc_connectivity_state_name(state));
}
grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
@@ -185,7 +186,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy.get()) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
+ gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
}
set_channel_connectivity_state_locked(w->chand, w->state,
@@ -214,7 +215,7 @@ static void watch_lb_policy_locked(channel_data* chand,
static void start_resolving_locked(channel_data* chand) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
+ gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
}
GPR_ASSERT(!chand->started_resolving);
chand->started_resolving = true;
@@ -225,7 +226,7 @@ static void start_resolving_locked(channel_data* chand) {
typedef struct {
char* server_name;
- grpc_server_retry_throttle_data* retry_throttle_data;
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
} service_config_parsing_state;
static void parse_retry_throttle_params(
@@ -278,7 +279,7 @@ static void parse_retry_throttle_params(
}
}
parsing_state->retry_throttle_data =
- grpc_retry_throttle_map_get_data_for_server(
+ grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
parsing_state->server_name, max_milli_tokens, milli_token_ratio);
}
}
@@ -296,18 +297,23 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
return;
}
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
+ gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
}
chand->resolver->RequestReresolutionLocked();
// Give back the closure to the LB policy.
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
+// TODO(roth): The logic in this function is very hard to follow. We
+// should refactor this so that it's easier to understand, perhaps as
+// part of changing the resolver API to more clearly differentiate
+// between transient failures and shutdown.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
- grpc_error_string(error));
+ gpr_log(GPR_INFO,
+ "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
+ chand->resolver_result, grpc_error_string(error));
}
// Extract the following fields from the resolver result, if non-nullptr.
bool lb_policy_updated = false;
@@ -316,7 +322,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
bool lb_policy_name_changed = false;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
char* service_config_json = nullptr;
- grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
if (chand->resolver_result != nullptr) {
if (chand->resolver != nullptr) {
@@ -416,18 +422,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
service_config->ParseGlobalParams(parse_retry_throttle_params,
&parsing_state);
grpc_uri_destroy(uri);
- retry_throttle_data = parsing_state.retry_throttle_data;
+ retry_throttle_data = std::move(parsing_state.retry_throttle_data);
}
method_params_table = service_config->CreateMethodConfigTable(
ClientChannelMethodParams::CreateFromJson);
}
}
}
- grpc_channel_args_destroy(chand->resolver_result);
- chand->resolver_result = nullptr;
}
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
"service_config=\"%s\"",
chand, lb_policy_name_dup,
@@ -449,10 +453,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
gpr_mu_unlock(&chand->info_mu);
// Swap out the retry throttle data.
- if (chand->retry_throttle_data != nullptr) {
- grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
- }
- chand->retry_throttle_data = retry_throttle_data;
+ chand->retry_throttle_data = std::move(retry_throttle_data);
// Swap out the method params table.
chand->method_params_table = std::move(method_params_table);
// If we have a new LB policy or are shutting down (in which case
@@ -465,7 +466,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
chand->resolver == nullptr) {
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
+ gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
chand->lb_policy.get());
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
@@ -479,11 +480,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
+ gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
}
if (chand->resolver != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
+ gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand);
}
chand->resolver.reset();
}
@@ -497,13 +498,15 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
"Channel disconnected", &error, 1));
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
+ grpc_channel_args_destroy(chand->resolver_result);
+ chand->resolver_result = nullptr;
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (lb_policy_created) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
+ gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
}
GRPC_ERROR_UNREF(state_error);
state = chand->lb_policy->CheckConnectivityLocked(&state_error);
@@ -515,11 +518,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
chand->exit_idle_when_lb_policy_arrives = false;
}
watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
+ } else if (chand->resolver_result == nullptr) {
+ // Transient failure.
+ GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (!lb_policy_updated) {
set_channel_connectivity_state_locked(
chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
}
+ grpc_channel_args_destroy(chand->resolver_result);
+ chand->resolver_result = nullptr;
chand->resolver->NextLocked(&chand->resolver_result,
&chand->on_resolver_result_changed);
GRPC_ERROR_UNREF(state_error);
@@ -715,12 +723,8 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
}
gpr_free(chand->info_lb_policy_name);
gpr_free(chand->info_service_config_json);
- if (chand->retry_throttle_data != nullptr) {
- grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
- }
- if (chand->method_params_table != nullptr) {
- chand->method_params_table.reset();
- }
+ chand->retry_throttle_data.reset();
+ chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties);
grpc_connectivity_state_destroy(&chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
@@ -798,7 +802,8 @@ typedef struct {
grpc_linked_mdelem* send_initial_metadata_storage;
grpc_metadata_batch send_initial_metadata;
// For send_message.
- grpc_caching_byte_stream send_message;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+ send_message;
// For send_trailing_metadata.
grpc_linked_mdelem* send_trailing_metadata_storage;
grpc_metadata_batch send_trailing_metadata;
@@ -808,7 +813,7 @@ typedef struct {
bool trailing_metadata_available;
// For intercepting recv_message.
grpc_closure recv_message_ready;
- grpc_byte_stream* recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
@@ -837,10 +842,11 @@ typedef struct {
bool completed_recv_trailing_metadata : 1;
// State for callback processing.
bool retry_dispatched : 1;
- bool recv_initial_metadata_ready_deferred : 1;
- bool recv_message_ready_deferred : 1;
+ subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
grpc_error* recv_initial_metadata_error;
+ subchannel_batch_data* recv_message_ready_deferred_batch;
grpc_error* recv_message_error;
+ subchannel_batch_data* recv_trailing_metadata_internal_batch;
} subchannel_call_retry_state;
// Pending batches stored in call data.
@@ -872,7 +878,7 @@ typedef struct client_channel_call_data {
grpc_call_stack* owning_call;
grpc_call_combiner* call_combiner;
- grpc_server_retry_throttle_data* retry_throttle_data;
+ grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
grpc_subchannel_call* subchannel_call;
@@ -914,12 +920,14 @@ typedef struct client_channel_call_data {
gpr_atm* peer_string;
// send_message
// When we get a send_message op, we replace the original byte stream
- // with a grpc_caching_byte_stream that caches the slices to a
- // local buffer for use in retries.
+ // with a CachingByteStream that caches the slices to a local buffer for
+ // use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
- grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
+ grpc_core::ManualConstructor<
+ grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
+ send_messages;
// send_trailing_metadata
bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage;
@@ -964,11 +972,12 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
// Set up cache for send_message ops.
if (batch->send_message) {
- grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
- calld->arena, sizeof(grpc_byte_stream_cache));
- grpc_byte_stream_cache_init(cache,
- batch->payload->send_message.send_message);
- calld->send_messages.push_back(cache);
+ grpc_core::ByteStreamCache* cache =
+ static_cast<grpc_core::ByteStreamCache*>(
+ gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
+ new (cache) grpc_core::ByteStreamCache(
+ std::move(batch->payload->send_message.send_message));
+ calld->send_messages->push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) {
@@ -986,6 +995,39 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
}
+// Frees cached send_initial_metadata.
+static void free_cached_send_initial_metadata(channel_data* chand,
+ call_data* calld) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
+ calld);
+ }
+ grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+}
+
+// Frees cached send_message at index idx.
+static void free_cached_send_message(channel_data* chand, call_data* calld,
+ size_t idx) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
+ chand, calld, idx);
+ }
+ (*calld->send_messages)[idx]->Destroy();
+}
+
+// Frees cached send_trailing_metadata.
+static void free_cached_send_trailing_metadata(channel_data* chand,
+ call_data* calld) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: destroying calld->send_trailing_metadata",
+ chand, calld);
+ }
+ grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+}
+
// Frees cached send ops that have already been completed after
// committing the call.
static void free_cached_send_op_data_after_commit(
@@ -993,19 +1035,13 @@ static void free_cached_send_op_data_after_commit(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (retry_state->completed_send_initial_metadata) {
- grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+ free_cached_send_initial_metadata(chand, calld);
}
for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
- "]",
- chand, calld, i);
- }
- grpc_byte_stream_cache_destroy(calld->send_messages[i]);
+ free_cached_send_message(chand, calld, i);
}
if (retry_state->completed_send_trailing_metadata) {
- grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+ free_cached_send_trailing_metadata(chand, calld);
}
}
@@ -1017,20 +1053,14 @@ static void free_cached_send_op_data_for_completed_batch(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (batch_data->batch.send_initial_metadata) {
- grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+ free_cached_send_initial_metadata(chand, calld);
}
if (batch_data->batch.send_message) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
- "]",
- chand, calld, retry_state->completed_send_message_count - 1);
- }
- grpc_byte_stream_cache_destroy(
- calld->send_messages[retry_state->completed_send_message_count - 1]);
+ free_cached_send_message(chand, calld,
+ retry_state->completed_send_message_count - 1);
}
if (batch_data->batch.send_trailing_metadata) {
- grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+ free_cached_send_trailing_metadata(chand, calld);
}
}
@@ -1058,7 +1088,7 @@ static void pending_batches_add(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
const size_t idx = get_batch_index(batch);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
calld, idx);
}
@@ -1079,14 +1109,14 @@ static void pending_batches_add(grpc_call_element* elem,
if (batch->send_message) {
calld->pending_send_message = true;
calld->bytes_buffered_for_retry +=
- batch->payload->send_message.send_message->length;
+ batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
calld->pending_send_trailing_metadata = true;
}
if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: exceeded retry buffer size, committing",
chand, calld);
}
@@ -1101,7 +1131,7 @@ static void pending_batches_add(grpc_call_element* elem,
// retries are disabled so that we don't bother with retry overhead.
if (calld->num_attempts_completed == 0) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: disabling retries before first attempt",
chand, calld);
}
@@ -1148,7 +1178,7 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
if (calld->pending_batches[i].batch != nullptr) ++num_batches;
}
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, num_batches, grpc_error_string(error));
}
@@ -1210,7 +1240,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
if (calld->pending_batches[i].batch != nullptr) ++num_batches;
}
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
@@ -1255,7 +1285,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem,
(!batch->recv_message ||
batch->payload->recv_message.recv_message_ready == nullptr)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: clearing pending batch", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
calld);
}
pending_batch_clear(calld, pending);
@@ -1274,7 +1304,8 @@ static bool pending_batch_is_completed(
return false;
}
if (pending->batch->send_message &&
- retry_state->completed_send_message_count < calld->send_messages.size()) {
+ retry_state->completed_send_message_count <
+ calld->send_messages->size()) {
return false;
}
if (pending->batch->send_trailing_metadata &&
@@ -1309,7 +1340,7 @@ static bool pending_batch_is_unstarted(
return true;
}
if (pending->batch->send_message &&
- retry_state->started_send_message_count < calld->send_messages.size()) {
+ retry_state->started_send_message_count < calld->send_messages->size()) {
return true;
}
if (pending->batch->send_trailing_metadata &&
@@ -1344,7 +1375,7 @@ static void retry_commit(grpc_call_element* elem,
if (calld->retry_committed) return;
calld->retry_committed = true;
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: committing retries", chand, calld);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
}
if (retry_state != nullptr) {
free_cached_send_op_data_after_commit(elem, retry_state);
@@ -1389,7 +1420,7 @@ static void do_retry(grpc_call_element* elem,
next_attempt_time = calld->retry_backoff->NextAttemptTime();
}
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRIuPTR " ms", chand,
calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
}
@@ -1423,7 +1454,7 @@ static bool maybe_retry(grpc_call_element* elem,
batch_data->subchannel_call));
if (retry_state->retry_dispatched) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: retry already dispatched", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
calld);
}
return true;
@@ -1431,16 +1462,18 @@ static bool maybe_retry(grpc_call_element* elem,
}
// Check status.
if (status == GRPC_STATUS_OK) {
- grpc_server_retry_throttle_data_record_success(calld->retry_throttle_data);
+ if (calld->retry_throttle_data != nullptr) {
+ calld->retry_throttle_data->RecordSuccess();
+ }
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: call succeeded", chand, calld);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
}
return false;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy->retryable_status_codes.Contains(status)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: status %s not configured as retryable", chand,
calld, grpc_status_code_to_string(status));
}
@@ -1453,17 +1486,17 @@ static bool maybe_retry(grpc_call_element* elem,
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
- if (!grpc_server_retry_throttle_data_record_failure(
- calld->retry_throttle_data)) {
+ if (calld->retry_throttle_data != nullptr &&
+ !calld->retry_throttle_data->RecordFailure()) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries throttled", chand, calld);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
}
return false;
}
// Check whether the call is committed.
if (calld->retry_committed) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries already committed", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
calld);
}
return false;
@@ -1472,7 +1505,7 @@ static bool maybe_retry(grpc_call_element* elem,
++calld->num_attempts_completed;
if (calld->num_attempts_completed >= retry_policy->max_attempts) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: exceeded %d retry attempts", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
calld, retry_policy->max_attempts);
}
return false;
@@ -1480,7 +1513,7 @@ static bool maybe_retry(grpc_call_element* elem,
// If the call was cancelled from the surface, don't retry.
if (calld->cancel_error != GRPC_ERROR_NONE) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: call cancelled from surface, not retrying",
chand, calld);
}
@@ -1493,16 +1526,15 @@ static bool maybe_retry(grpc_call_element* elem,
uint32_t ms;
if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: not retrying due to server push-back",
chand, calld);
}
return false;
} else {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: server push-back: retry in %u ms", chand,
- calld, ms);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
+ chand, calld, ms);
}
server_pushback_ms = (grpc_millis)ms;
}
@@ -1575,7 +1607,7 @@ static void invoke_recv_initial_metadata_callback(void* arg,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready !=
nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: invoking recv_initial_metadata_ready for "
"pending batch at index %" PRIuPTR,
chand, calld, i);
@@ -1611,7 +1643,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
chand, calld, grpc_error_string(error));
}
@@ -1626,12 +1658,12 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_initial_metadata_ready "
"(Trailers-Only)",
chand, calld);
}
- retry_state->recv_initial_metadata_ready_deferred = true;
+ retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
@@ -1668,7 +1700,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
if (batch != nullptr && batch->recv_message &&
batch->payload->recv_message.recv_message_ready != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: invoking recv_message_ready for "
"pending batch at index %" PRIuPTR,
chand, calld, i);
@@ -1680,7 +1712,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
- batch_data->recv_message;
+ std::move(batch_data->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
@@ -1701,7 +1733,7 @@ static void recv_message_ready(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: got recv_message_ready, error=%s",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
chand, calld, grpc_error_string(error));
}
subchannel_call_retry_state* retry_state =
@@ -1715,12 +1747,12 @@ static void recv_message_ready(void* arg, grpc_error* error) {
if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_message_ready (nullptr "
"message and recv_trailing_metadata pending)",
chand, calld);
}
- retry_state->recv_message_ready_deferred = true;
+ retry_state->recv_message_ready_deferred_batch = batch_data;
retry_state->recv_message_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
@@ -1739,6 +1771,59 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
//
+// list of closures to execute in call combiner
+//
+
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+ bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+ const char* caller,
+ closure_to_execute* closures,
+ size_t num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Note that the call combiner will be yielded for each closure that
+ // we schedule. We're already running in the call combiner, so one of
+ // the closures can be scheduled directly, but the others will
+ // have to re-enter the call combiner.
+ if (num_closures > 0) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand,
+ calld, caller, closures[0].reason);
+ }
+ GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+ if (closures[0].free_reason) {
+ gpr_free(const_cast<char*>(closures[0].reason));
+ }
+ for (size_t i = 1; i < num_closures; ++i) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: %s starting closure in call combiner: %s",
+ chand, calld, caller, closures[i].reason);
+ }
+ GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+ closures[i].error, closures[i].reason);
+ if (closures[i].free_reason) {
+ gpr_free(const_cast<char*>(closures[i].reason));
+ }
+ }
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand,
+ calld, caller);
+ }
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
+ }
+}
+
+//
// on_complete callback handling
//
@@ -1766,36 +1851,35 @@ static void update_retry_state_for_completed_batch(
}
}
-// Represents a closure that needs to run as a result of a completed batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
-} closure_to_execute;
-
// Adds any necessary closures for deferred recv_initial_metadata and
// recv_message callbacks to closures, updating *num_closures as needed.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
closure_to_execute* closures, size_t* num_closures) {
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback, batch_data,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
- invoke_recv_message_callback,
- batch_data, grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
+ if (batch_data->batch.recv_trailing_metadata) {
+ // Add closure for deferred recv_initial_metadata_ready.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_initial_metadata_error;
+ closure->reason = "resuming recv_initial_metadata_ready";
+ retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+ }
+ // Add closure for deferred recv_message_ready.
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_message_ready, invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_message_error;
+ closure->reason = "resuming recv_message_ready";
+ retry_state->recv_message_ready_deferred_batch = nullptr;
+ }
}
}
@@ -1809,7 +1893,7 @@ static void add_closures_for_replay_or_pending_send_ops(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
bool have_pending_send_message_ops =
- retry_state->started_send_message_count < calld->send_messages.size();
+ retry_state->started_send_message_count < calld->send_messages->size();
bool have_pending_send_trailing_metadata_op =
calld->seen_send_trailing_metadata &&
!retry_state->started_send_trailing_metadata;
@@ -1827,7 +1911,7 @@ static void add_closures_for_replay_or_pending_send_ops(
}
if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, calld);
}
@@ -1852,7 +1936,7 @@ static void add_closures_for_completed_pending_batches(
pending_batch* pending = &calld->pending_batches[i];
if (pending_batch_is_completed(pending, calld, retry_state)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: pending batch completed at index %" PRIuPTR,
chand, calld, i);
}
@@ -1885,7 +1969,7 @@ static void add_closures_to_fail_unstarted_pending_batches(
pending_batch* pending = &calld->pending_batches[i];
if (pending_batch_is_unstarted(pending, calld, retry_state)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: failing unstarted pending batch at index "
"%" PRIuPTR,
chand, calld, i);
@@ -1929,7 +2013,7 @@ static void on_complete(void* arg, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
chand, calld, grpc_error_string(error), batch_str);
gpr_free(batch_str);
}
@@ -1940,11 +2024,13 @@ static void on_complete(void* arg, grpc_error* error) {
// If we have previously completed recv_trailing_metadata, then the
// call is finished.
bool call_finished = retry_state->completed_recv_trailing_metadata;
+ // Record whether we were already committed before receiving this callback.
+ const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
update_retry_state_for_completed_batch(batch_data, retry_state);
if (call_finished) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: call already finished", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand,
calld);
}
} else {
@@ -1968,35 +2054,39 @@ static void on_complete(void* arg, grpc_error* error) {
if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
- } else if (retry_state->completed_recv_trailing_metadata) {
- call_finished = true;
- }
- if (call_finished && grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
}
- // If the call is finished, check if we should retry.
- if (call_finished &&
- maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ // If the call just finished, check if we should retry.
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
}
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred) {
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_initial_metadata_ready_deferred_batch !=
+ nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ return;
}
- batch_data_unref(batch_data);
- return;
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
}
}
- // If the call is finished or retries are committed, free cached data for
- // send ops that we've just completed.
- if (call_finished || calld->retry_committed) {
+ // If we were already committed before receiving this callback, free
+ // cached data for send ops that we've just completed. (If the call has
+ // just now finished, the call to retry_commit() above will have freed all
+ // cached send ops, so we don't need to do it here.)
+ if (previously_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
// Call not being retried.
@@ -2031,20 +2121,8 @@ static void on_complete(void* arg, grpc_error* error) {
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- for (size_t i = 1; i < num_closures; ++i) {
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- }
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "no closures to run for on_complete");
- }
+ execute_closures_in_call_combiner(elem, "on_complete", closures,
+ num_closures);
}
//
@@ -2061,6 +2139,31 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
grpc_subchannel_call_process_op(subchannel_call, batch);
}
+// Adds a closure to closures that will execute batch in the call combiner.
+static void add_closure_for_subchannel_batch(
+ call_data* calld, grpc_transport_stream_op_batch* batch,
+ closure_to_execute* closures, size_t* num_closures) {
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch->handler_private.closure;
+ closure->error = GRPC_ERROR_NONE;
+ // If the tracer is enabled, we log a more detailed message, which
+ // requires dynamic allocation. This will be freed in
+ // start_retriable_subchannel_batches().
+ if (grpc_client_channel_trace.enabled()) {
+ char* batch_str = grpc_transport_stream_op_batch_string(batch);
+ gpr_asprintf(const_cast<char**>(&closure->reason),
+ "starting batch in call combiner: %s", batch_str);
+ gpr_free(batch_str);
+ closure->free_reason = true;
+ } else {
+ closure->reason = "start_subchannel_batch";
+ }
+}
+
// Adds retriable send_initial_metadata op to batch_data.
static void add_retriable_send_initial_metadata_op(
call_data* calld, subchannel_call_retry_state* retry_state,
@@ -2120,17 +2223,17 @@ static void add_retriable_send_message_op(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
chand, calld, retry_state->started_send_message_count);
}
- grpc_byte_stream_cache* cache =
- calld->send_messages[retry_state->started_send_message_count];
+ grpc_core::ByteStreamCache* cache =
+ (*calld->send_messages)[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
- grpc_caching_byte_stream_init(&batch_data->send_message, cache);
+ batch_data->send_message.Init(cache);
batch_data->batch.send_message = true;
- batch_data->batch.payload->send_message.send_message =
- &batch_data->send_message.base;
+ batch_data->batch.payload->send_message.send_message.reset(
+ batch_data->send_message.get());
}
// Adds retriable send_trailing_metadata op to batch_data.
@@ -2207,7 +2310,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: call failed but recv_trailing_metadata not "
"started; starting it internally",
chand, calld);
@@ -2216,8 +2319,12 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
- subchannel_batch_data* batch_data = batch_data_create(elem, 1);
+ // Create batch_data with 2 refs, since this batch will be unreffed twice:
+ // once when the subchannel batch returns, and again when we actually get
+ // a recv_trailing_metadata op from the surface.
+ subchannel_batch_data* batch_data = batch_data_create(elem, 2);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
+ retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
}
@@ -2235,7 +2342,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
!retry_state->started_send_initial_metadata &&
!calld->pending_send_initial_metadata) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_initial_metadata op",
chand, calld);
@@ -2246,12 +2353,12 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
}
// send_message.
// Note that we can only have one send_message op in flight at a time.
- if (retry_state->started_send_message_count < calld->send_messages.size() &&
+ if (retry_state->started_send_message_count < calld->send_messages->size() &&
retry_state->started_send_message_count ==
retry_state->completed_send_message_count &&
!calld->pending_send_message) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_message op",
chand, calld);
@@ -2266,11 +2373,11 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// to start, since we can't send down any more send_message ops after
// send_trailing_metadata.
if (calld->seen_send_trailing_metadata &&
- retry_state->started_send_message_count == calld->send_messages.size() &&
+ retry_state->started_send_message_count == calld->send_messages->size() &&
!retry_state->started_send_trailing_metadata &&
!calld->pending_send_trailing_metadata) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_trailing_metadata op",
chand, calld);
@@ -2288,7 +2395,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_transport_stream_op_batch** batches, size_t* num_batches) {
+ closure_to_execute* closures, size_t* num_closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2317,7 +2424,7 @@ static void add_subchannel_batches_for_pending_batches(
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
- calld->send_messages.size() ||
+ calld->send_messages->size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
@@ -2331,13 +2438,37 @@ static void add_subchannel_batches_for_pending_batches(
}
if (batch->recv_trailing_metadata &&
retry_state->started_recv_trailing_metadata) {
+ // If we previously completed a recv_trailing_metadata op
+ // initiated by start_internal_recv_trailing_metadata(), use the
+ // result of that instead of trying to re-start this op.
+ if (retry_state->recv_trailing_metadata_internal_batch != nullptr) {
+ // If the batch completed, then trigger the completion callback
+ // directly, so that we return the previously returned results to
+ // the application. Otherwise, just unref the internally
+ // started subchannel batch, since we'll propagate the
+ // completion when it completes.
+ if (retry_state->completed_recv_trailing_metadata) {
+ subchannel_batch_data* batch_data =
+ retry_state->recv_trailing_metadata_internal_batch;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch_data->on_complete;
+ // Batches containing recv_trailing_metadata always succeed.
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason =
+ "re-executing on_complete for recv_trailing_metadata "
+ "to propagate internally triggered result";
+ } else {
+ batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
+ }
+ retry_state->recv_trailing_metadata_internal_batch = nullptr;
+ }
continue;
}
// If we're not retrying, just send the batch as-is.
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- batches[(*num_batches)++] = batch;
+ add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
pending_batch_clear(calld, pending);
continue;
}
@@ -2374,7 +2505,8 @@ static void add_subchannel_batches_for_pending_batches(
GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- batches[(*num_batches)++] = &batch_data->batch;
+ add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+ num_closures);
}
}
@@ -2385,69 +2517,36 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: constructing retriable batches",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
chand, calld);
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
+ // Construct list of closures to execute, one for each pending batch.
// We can start up to 6 batches.
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_closures = 0;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- batches[num_batches++] = &replay_batch_data->batch;
+ add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+ &num_closures);
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, batches,
- &num_batches);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+ &num_closures);
// Start batches on subchannel call.
- // Note that the call combiner will be yielded for each batch that we
- // send down. We're already running in the call combiner, so one of
- // the batches can be started directly, but the others will have to
- // re-enter the call combiner.
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_batches, calld->subchannel_call);
- }
- if (num_batches == 0) {
- // This should be fairly rare, but it can happen when (e.g.) an
- // attempt completes before it has finished replaying all
- // previously sent messages.
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "no retriable subchannel batches to start");
- } else {
- for (size_t i = 1; i < num_batches; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- char* batch_str = grpc_transport_stream_op_batch_string(batches[i]);
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: starting batch in call combiner: %s", chand,
- calld, batch_str);
- gpr_free(batch_str);
- }
- batches[i]->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure,
- start_batch_in_call_combiner, batches[i],
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batches[i]->handler_private.closure,
- GRPC_ERROR_NONE, "start_subchannel_batch");
- }
- if (grpc_client_channel_trace.enabled()) {
- char* batch_str = grpc_transport_stream_op_batch_string(batches[0]);
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld,
- batch_str);
- gpr_free(batch_str);
- }
- // Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ chand, calld, num_closures, calld->subchannel_call);
}
+ execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+ closures, num_closures);
}
//
@@ -2472,7 +2571,7 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
}
if (new_error != GRPC_ERROR_NONE) {
@@ -2513,7 +2612,7 @@ static void pick_done(void* arg, grpc_error* error) {
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: failed to create subchannel: error=%s",
chand, calld, grpc_error_string(new_error));
}
@@ -2557,7 +2656,7 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
// the one we started it on. However, this will just be a no-op.
if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p",
chand, calld, chand->lb_policy.get());
}
chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
@@ -2572,8 +2671,8 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
- chand, calld);
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand,
+ calld);
}
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
@@ -2585,12 +2684,11 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, calld);
}
if (chand->retry_throttle_data != nullptr) {
- calld->retry_throttle_data =
- grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
+ calld->retry_throttle_data = chand->retry_throttle_data->Ref();
}
if (chand->method_params_table != nullptr) {
calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
@@ -2624,8 +2722,8 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
- chand, calld, chand->lb_policy.get());
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand,
+ calld, chand->lb_policy.get());
}
// Only get service config data on the first attempt.
if (calld->num_attempts_completed == 0) {
@@ -2672,7 +2770,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
if (pick_done) {
// Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
}
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
@@ -2716,7 +2814,7 @@ static void pick_after_resolver_result_cancel_locked(void* arg,
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: cancelling pick waiting for resolver result",
chand, calld);
}
@@ -2736,7 +2834,7 @@ static void pick_after_resolver_result_done_locked(void* arg,
if (args->finished) {
/* cancelled, do nothing */
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "call cancelled before resolver result");
+ gpr_log(GPR_INFO, "call cancelled before resolver result");
}
gpr_free(args);
return;
@@ -2747,13 +2845,51 @@ static void pick_after_resolver_result_done_locked(void* arg,
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- } else if (chand->lb_policy != nullptr) {
+ } else if (chand->resolver == nullptr) {
+ // Shutting down.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
+ calld);
+ }
+ async_pick_done_locked(
+ elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ } else if (chand->lb_policy == nullptr) {
+ // Transient resolver failure.
+ // If call has wait_for_ready=true, try again; otherwise, fail.
+ uint32_t send_initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? calld->send_initial_metadata_flags
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=true; trying again",
+ chand, calld);
+ }
+ pick_after_resolver_result_start_locked(elem);
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=false; failing",
+ chand, calld);
+ }
+ async_pick_done_locked(
+ elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+ } else {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
}
if (pick_callback_start_locked(elem)) {
@@ -2765,37 +2901,13 @@ static void pick_after_resolver_result_done_locked(void* arg,
async_pick_done_locked(elem, GRPC_ERROR_NONE);
}
}
- // TODO(roth): It should be impossible for chand->lb_policy to be nullptr
- // here, so the rest of this code should never actually be executed.
- // However, we have reports of a crash on iOS that triggers this case,
- // so we are temporarily adding this to restore branches that were
- // removed in https://github.com/grpc/grpc/pull/12297. Need to figure
- // out what is actually causing this to occur and then figure out the
- // right way to deal with it.
- else if (chand->resolver != nullptr) {
- // No LB policy, so try again.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: resolver returned but no LB policy, "
- "trying again",
- chand, calld);
- }
- pick_after_resolver_result_start_locked(elem);
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
- calld);
- }
- async_pick_done_locked(
- elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- }
}
static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring pick pending resolver result", chand,
calld);
}
@@ -2862,7 +2974,7 @@ static void cc_start_transport_stream_op_batch(
// If we've previously been cancelled, immediately fail any new batches.
if (calld->cancel_error != GRPC_ERROR_NONE) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
chand, calld, grpc_error_string(calld->cancel_error));
}
// Note: This will release the call combiner.
@@ -2881,7 +2993,7 @@ static void cc_start_transport_stream_op_batch(
calld->cancel_error =
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
calld, grpc_error_string(calld->cancel_error));
}
// If we do not have a subchannel call (i.e., a pick has not yet
@@ -2907,7 +3019,7 @@ static void cc_start_transport_stream_op_batch(
// streaming calls).
if (calld->subchannel_call != nullptr) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
calld, calld->subchannel_call);
}
@@ -2919,7 +3031,7 @@ static void cc_start_transport_stream_op_batch(
// combiner to start a pick.
if (batch->send_initial_metadata) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
chand, calld);
}
GRPC_CLOSURE_SCHED(
@@ -2929,7 +3041,7 @@ static void cc_start_transport_stream_op_batch(
} else {
// For all other batches, release the call combiner.
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"chand=%p calld=%p: saved batch, yeilding call combiner", chand,
calld);
}
@@ -2955,6 +3067,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem,
calld->deadline);
}
calld->enable_retries = chand->enable_retries;
+ calld->send_messages.Init();
return GRPC_ERROR_NONE;
}
@@ -2968,6 +3081,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
grpc_deadline_state_destroy(elem);
}
grpc_slice_unref_internal(calld->path);
+ calld->retry_throttle_data.reset();
calld->method_params.reset();
GRPC_ERROR_UNREF(calld->cancel_error);
if (calld->subchannel_call != nullptr) {
@@ -2989,6 +3103,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
calld->pick.subchannel_call_context[i].value);
}
}
+ calld->send_messages.Destroy();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
@@ -3137,6 +3252,8 @@ static void watch_connectivity_state_locked(void* arg,
external_connectivity_watcher* found = nullptr;
if (w->state != nullptr) {
external_connectivity_watcher_list_append(w->chand, w);
+ // An assumption is being made that the closure is scheduled on the exec ctx
+ // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
grpc_combiner_scheduler(w->chand->combiner));
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc
index 3c3a97532f..8385852d1b 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.cc
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc
@@ -39,38 +39,13 @@ static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
}
-static bool set_default_host_if_unset(grpc_channel_stack_builder* builder,
- void* unused) {
- const grpc_channel_args* args =
- grpc_channel_stack_builder_get_channel_arguments(builder);
- for (size_t i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY) ||
- 0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
- return true;
- }
- }
- grpc_core::UniquePtr<char> default_authority =
- grpc_core::ResolverRegistry::GetDefaultAuthority(
- grpc_channel_stack_builder_get_target(builder));
- if (default_authority.get() != nullptr) {
- grpc_arg arg = grpc_channel_arg_string_create(
- (char*)GRPC_ARG_DEFAULT_AUTHORITY, default_authority.get());
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
- grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
- grpc_channel_args_destroy(new_args);
- }
- return true;
-}
-
void grpc_client_channel_init(void) {
grpc_core::LoadBalancingPolicyRegistry::Builder::InitRegistry();
grpc_core::ResolverRegistry::Builder::InitRegistry();
- grpc_retry_throttle_map_init();
+ grpc_core::internal::ServerRetryThrottleMap::Init();
grpc_proxy_mapper_registry_init();
grpc_register_http_proxy_mapper();
grpc_subchannel_index_init();
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
- set_default_host_if_unset, nullptr);
grpc_channel_init_register_stage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void*)&grpc_client_channel_filter);
@@ -81,7 +56,7 @@ void grpc_client_channel_shutdown(void) {
grpc_subchannel_index_shutdown();
grpc_channel_init_shutdown();
grpc_proxy_mapper_registry_shutdown();
- grpc_retry_throttle_map_shutdown();
+ grpc_core::internal::ServerRetryThrottleMap::Shutdown();
grpc_core::ResolverRegistry::Builder::ShutdownRegistry();
grpc_core::LoadBalancingPolicyRegistry::Builder::ShutdownRegistry();
}
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index fb29fa788d..4e8b8b71db 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -326,7 +326,7 @@ static void http_connect_handshaker_do_handshake(
static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
- http_connect_handshaker_do_handshake};
+ http_connect_handshaker_do_handshake, "http_connect"};
static grpc_handshaker* grpc_http_connect_handshaker_create() {
http_connect_handshaker* handshaker =
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index fa63dd75b5..e065f45639 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -44,13 +44,13 @@ void LoadBalancingPolicy::TryReresolutionLocked(
GRPC_CLOSURE_SCHED(request_reresolution_, error);
request_reresolution_ = nullptr;
if (grpc_lb_trace->enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"%s %p: scheduling re-resolution closure with error=%s.",
grpc_lb_trace->name(), this, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled()) {
- gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
+ gpr_log(GPR_INFO, "%s %p: no available re-resolution closure.",
grpc_lb_trace->name(), this);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index c3e43e5ef6..454e00a690 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -162,6 +162,10 @@ class LoadBalancingPolicy
GRPC_ABSTRACT_BASE_CLASS
protected:
+ // So Delete() can access our protected dtor.
+ template <typename T>
+ friend void Delete(T*);
+
explicit LoadBalancingPolicy(const Args& args);
virtual ~LoadBalancingPolicy();
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index cb39e4224e..70a91b2567 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -61,6 +61,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <inttypes.h>
#include <limits.h>
@@ -75,6 +76,7 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
@@ -188,6 +190,10 @@ class GrpcLb : public LoadBalancingPolicy {
bool seen_initial_response() const { return seen_initial_response_; }
private:
+ // So Delete() can access our private dtor.
+ template <typename T>
+ friend void grpc_core::Delete(T*);
+
~BalancerCallState();
GrpcLb* grpclb_policy() const {
@@ -417,20 +423,20 @@ void ParseServer(const grpc_grpclb_server* server,
grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
- const uint16_t netorder_port = htons((uint16_t)server->port);
+ const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address* ip = &server->ip_address;
if (ip->size == 4) {
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
- addr4->sin_family = AF_INET;
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
+ grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
+ addr4->sin_family = GRPC_AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
- addr6->sin6_family = AF_INET6;
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
+ grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
+ addr6->sin6_family = GRPC_AF_INET6;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
@@ -504,9 +510,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
// the polling entities from client_channel.
GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
- grpc_slice host =
- grpc_slice_from_copied_string(grpclb_policy()->server_name_);
- grpc_millis deadline =
+ const grpc_millis deadline =
grpclb_policy()->lb_call_timeout_ms_ == 0
? GRPC_MILLIS_INF_FUTURE
: ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
@@ -514,8 +518,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
grpclb_policy_->interested_parties(),
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
- &host, deadline, nullptr);
- grpc_slice_unref_internal(host);
+ nullptr, deadline, nullptr);
// Init the LB call request payload.
grpc_grpclb_request* request =
grpc_grpclb_request_create(grpclb_policy()->server_name_);
@@ -982,6 +985,14 @@ grpc_channel_args* BuildBalancerChannelArgs(
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ // The LB channel should use the authority indicated by the target
+ // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
+ // as opposed to the authority from the parent channel.
+ GRPC_ARG_DEFAULT_AUTHORITY,
+ // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
+ // treated as a stand-alone channel and not inherit this argument from the
+ // args of the parent channel.
+ GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
};
// Channel args to add.
const grpc_arg args_to_add[] = {
@@ -993,6 +1004,9 @@ grpc_channel_args* BuildBalancerChannelArgs(
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator),
+ // A channel arg indicating the target is a grpclb load balancer.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
};
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
@@ -1237,7 +1251,7 @@ bool GrpcLb::PickLocked(PickState* pick) {
}
} else { // rr_policy_ == NULL
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
this);
}
@@ -1403,14 +1417,13 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
void GrpcLb::StartBalancerCallRetryTimerLocked() {
grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this);
+ gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
if (timeout > 0) {
- gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this,
- timeout);
+ gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ this, timeout);
} else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+ gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
this);
}
}
@@ -1689,9 +1702,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_lb_addresses* addresses;
+ bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
GPR_ASSERT(serverlist_->num_servers > 0);
addresses = ProcessServerlist(serverlist_);
+ is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
@@ -1705,9 +1720,18 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
- const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
+ const grpc_arg args_to_add[] = {
+ grpc_lb_addresses_create_channel_arg(addresses),
+ // A channel arg indicating if the target is a backend inferred from a
+ // grpclb load balancer.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(
+ GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
+ is_backend_from_grpclb_load_balancer),
+ };
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
- args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1);
+ args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
+ GPR_ARRAY_SIZE(args_to_add));
grpc_lb_addresses_destroy(addresses);
return args;
}
@@ -1718,7 +1742,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
GPR_ASSERT(args != nullptr);
if (rr_policy_ != nullptr) {
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this,
+ gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
rr_policy_.get());
}
rr_policy_->UpdateLocked(*args);
@@ -1729,7 +1753,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
lb_policy_args.args = args;
CreateRoundRobinPolicyLocked(lb_policy_args);
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this,
+ gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
rr_policy_.get());
}
}
@@ -1745,7 +1769,7 @@ void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(
- GPR_DEBUG,
+ GPR_INFO,
"[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
grpclb_policy, grpclb_policy->rr_policy_.get());
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
new file mode 100644
index 0000000000..4d39c4d504
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
+
+#include <grpc/support/port_platform.h>
+
+/** Channel arg indicating if a target corresponding to the address is grpclb
+ * loadbalancer. The type of this arg is an integer and the value is treated as
+ * a bool. */
+#define GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER \
+ "grpc.address_is_grpclb_load_balancer"
+/** Channel arg indicating if a target corresponding to the address is a backend
+ * received from a balancer. The type of this arg is an integer and the value is
+ * treated as a bool. */
+#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER \
+ "grpc.address_is_backend_from_grpclb_load_balancer"
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H \
+ */
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 9090c34412..76df976698 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -62,31 +62,65 @@ class PickFirst : public LoadBalancingPolicy {
private:
~PickFirst();
+ class PickFirstSubchannelList;
+
+ class PickFirstSubchannelData
+ : public SubchannelData<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner) {}
+
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
+ };
+
+ class PickFirstSubchannelList
+ : public SubchannelList<PickFirstSubchannelList,
+ PickFirstSubchannelData> {
+ public:
+ PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses,
+ grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~PickFirstSubchannelList() {
+ PickFirst* p = static_cast<PickFirst*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+ };
+
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
-
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-
- /** all our subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
- /** latest pending subchannel list */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
- /** selected subchannel in \a subchannel_list */
- grpc_lb_subchannel_data* selected_ = nullptr;
- /** have we started picking? */
+ // All our subchannels.
+ OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
+ // Latest pending subchannel list.
+ OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
+ // Selected subchannel in \a subchannel_list_.
+ PickFirstSubchannelData* selected_ = nullptr;
+ // Have we started picking?
bool started_picking_ = false;
- /** are we shut down? */
+ // Are we shut down?
bool shutdown_ = false;
- /** list of picks that are waiting on connectivity */
+ // List of picks that are waiting on connectivity.
PickState* pending_picks_ = nullptr;
- /** our connectivity state tracker */
+ // Our connectivity state tracker.
grpc_connectivity_state_tracker state_tracker_;
};
@@ -95,7 +129,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p created.", this);
+ gpr_log(GPR_INFO, "Pick First %p created.", this);
}
UpdateLocked(*args.args);
grpc_subchannel_index_ref();
@@ -103,7 +137,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Destroying Pick First %p", this);
+ gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
@@ -126,7 +160,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
void PickFirst::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this);
+ gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
shutdown_ = true;
PickState* pick;
@@ -137,15 +171,8 @@ void PickFirst::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
- subchannel_list_ = nullptr;
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
- "pf_shutdown");
- latest_pending_subchannel_list_ = nullptr;
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -192,14 +219,10 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
- if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
- subchannel_list_->checking_subchannel = 0;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(
- subchannel_list_, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+ if (subchannel_list_ != nullptr) {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
+ subchannel_list_->subchannel(i)->StartConnectivityWatchLocked();
break;
}
}
@@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() {
bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {
- pick->connected_subchannel = selected_->connected_subchannel;
+ pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true;
}
// No subchannel selected yet, so handle asynchronously.
@@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) {
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
+ for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
- grpc_lb_subchannel_data_unref_subchannel(sd,
- "selected_different_subchannel");
+ sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
@@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
- selected_->connected_subchannel->Ping(on_initiate, on_ack);
+ selected_->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
-void PickFirst::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void PickFirst::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
@@ -295,75 +299,67 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
return;
}
const grpc_lb_addresses* addresses =
- (const grpc_lb_addresses*)arg->value.pointer.p;
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
+ auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
- client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
+ client_channel_factory(), args);
+ if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
- }
- subchannel_list_ = subchannel_list; // Empty list.
+ subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return;
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "pf_update_before_selected");
+ subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
- subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- if (sd->subchannel == selected_->subchannel) {
+ for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
+ PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
+ if (sd->subchannel() == selected_->subchannel()) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- this, selected_->subchannel, i,
- subchannel_list->num_subchannels);
- }
- if (selected_->connected_subchannel != nullptr) {
- sd->connected_subchannel = selected_->connected_subchannel;
+ this, selected_->subchannel(), i,
+ subchannel_list->num_subchannels());
}
- selected_ = sd;
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "pf_update_includes_selected");
+ // Make sure it's in state READY. It might not be if we grabbed
+ // the combiner while a connectivity state notification
+ // informing us otherwise is pending.
+ // Note that CheckConnectivityStateLocked() also takes a ref to
+ // the connected subchannel.
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
+ selected_ = sd;
+ subchannel_list_ = std::move(subchannel_list);
+ DestroyUnselectedSubchannelsLocked();
+ sd->StartConnectivityWatchLocked();
+ // If there was a previously pending update (which may or may
+ // not have contained the currently selected subchannel), drop
+ // it, so that it doesn't override what we've done here.
+ latest_pending_subchannel_list_.reset();
+ return;
}
- subchannel_list_ = subchannel_list;
- DestroyUnselectedSubchannelsLocked();
- SubchannelListRefForConnectivityWatch(
- subchannel_list, "connectivity_watch+replace_selected");
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- // If there was a previously pending update (which may or may
- // not have contained the currently selected subchannel), drop
- // it, so that it doesn't override what we've done here.
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_,
- "pf_update_includes_selected+outdated");
- latest_pending_subchannel_list_ = nullptr;
- }
- return;
+ GRPC_ERROR_UNREF(error);
}
}
// Not keeping the previous selected subchannel, so set the latest
@@ -372,88 +368,66 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
// subchannel list.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
+ this, latest_pending_subchannel_list_.get(),
+ subchannel_list.get());
}
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated_dont_smash");
}
- latest_pending_subchannel_list_ = subchannel_list;
- }
- // If we've started picking, start trying to connect to the first
- // subchannel in the new list.
- if (started_picking_) {
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch+update");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[0]);
+ latest_pending_subchannel_list_ = std::move(subchannel_list);
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (started_picking_) {
+ latest_pending_subchannel_list_->subchannel(0)
+ ->StartConnectivityWatchLocked();
+ }
}
}
-void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy);
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
- " of %" PRIuPTR
- "), subchannel_list %p: state=%s p->shutdown_=%d "
- "sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list->checking_subchannel,
- sd->subchannel_list->num_subchannels, sd->subchannel_list,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
- grpc_error_string(error));
- }
- // If the policy is shutting down, unref and return.
- if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_shutdown");
- return;
- }
- // If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_sl_shutdown");
- return;
- }
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- // Update state.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
+ // The notification must be for a subchannel in either the current or
+ // latest pending subchannel lists.
+ GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
+ subchannel_list() == p->latest_pending_subchannel_list_.get());
// Handle updates for the currently selected subchannel.
- if (p->selected_ == sd) {
+ if (p->selected_ == this) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p connectivity changed for selected subchannel", p);
+ }
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
- if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
+ if (connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list_ != nullptr) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(
- sd->subchannel_list, "selected_not_ready+switch_to_update");
- grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list_, "selected_not_ready+switch_to_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ StopConnectivityWatchLocked();
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
+ error != GRPC_ERROR_NONE
+ ? GRPC_ERROR_REF(error)
+ : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "selected subchannel not ready; switching to pending "
+ "update"),
+ "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -462,19 +436,16 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(
- sd, "pf_selected_shutdown"); // Unrefs connected subchannel
+ UnrefSubchannelLocked("pf_selected_shutdown");
+ StopConnectivityWatchLocked();
} else {
- grpc_connectivity_state_set(&p->state_tracker_,
- sd->curr_connectivity_state,
+ grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
}
}
+ GRPC_ERROR_UNREF(error);
return;
}
// If we get here, there are two possible cases:
@@ -486,26 +457,27 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
- switch (sd->curr_connectivity_state) {
+ switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
- GPR_ASSERT(p->subchannel_list_ != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "finish_update");
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
+ if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ p, p->latest_pending_subchannel_list_.get(),
+ p->subchannel_list_.get());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- p->selected_ = sd;
+ p->selected_ = this;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
- sd->subchannel);
+ subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
@@ -513,7 +485,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
- pick->connected_subchannel = p->selected_->connected_subchannel;
+ pick->connected_subchannel =
+ p->selected_->connected_subchannel()->Ref();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -522,45 +495,43 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ StopConnectivityWatchLocked();
+ PickFirstSubchannelData* sd = this;
do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr);
+ size_t next_index =
+ (sd->Index() + 1) % subchannel_list()->num_subchannels();
+ sd = subchannel_list()->subchannel(next_index);
+ } while (sd->subchannel() == nullptr);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
- if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list_) {
+ if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ sd->StartConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list_) {
+ if (subchannel_list() == p->subchannel_list_.get()) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
// Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
GPR_UNREACHABLE_CODE(break);
}
+ GRPC_ERROR_UNREF(error);
}
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index e534131c02..79e8ad5663 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -73,23 +73,127 @@ class RoundRobin : public LoadBalancingPolicy {
private:
~RoundRobin();
- void ShutdownLocked() override;
+ // Forward declaration.
+ class RoundRobinSubchannelList;
+
+ // Data for a particular subchannel in a subchannel list.
+ // This subclass adds the following functionality:
+ // - Tracks user_data associated with each address, which will be
+ // returned along with picks that select the subchannel.
+ // - Tracks the previous connectivity state of the subchannel, so that
+ // we know how many subchannels are in each state.
+ class RoundRobinSubchannelData
+ : public SubchannelData<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address,
+ grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : SubchannelData(subchannel_list, user_data_vtable, address, subchannel,
+ combiner),
+ user_data_vtable_(user_data_vtable),
+ user_data_(user_data_vtable_ != nullptr
+ ? user_data_vtable_->copy(address.user_data)
+ : nullptr) {}
+
+ void UnrefSubchannelLocked(const char* reason) override {
+ SubchannelData::UnrefSubchannelLocked(reason);
+ if (user_data_ != nullptr) {
+ GPR_ASSERT(user_data_vtable_ != nullptr);
+ user_data_vtable_->destroy(user_data_);
+ user_data_ = nullptr;
+ }
+ }
- void StartPickingLocked();
- size_t GetNextReadySubchannelIndexLocked();
- void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
- void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error);
+ void* user_data() const { return user_data_; }
+
+ grpc_connectivity_state connectivity_state() const {
+ return last_connectivity_state_;
+ }
- static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+ void UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error);
+
+ private:
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override;
+
+ const grpc_lb_user_data_vtable* user_data_vtable_;
+ void* user_data_ = nullptr;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
+ };
+
+ // A list of subchannels.
+ class RoundRobinSubchannelList
+ : public SubchannelList<RoundRobinSubchannelList,
+ RoundRobinSubchannelData> {
+ public:
+ RoundRobinSubchannelList(
+ RoundRobin* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, addresses, combiner,
+ client_channel_factory, args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
- void SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
- void SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+ ~RoundRobinSubchannelList() {
+ GRPC_ERROR_UNREF(last_transient_failure_error_);
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+
+ // Starts watching the subchannels in this list.
+ void StartWatchingLocked();
+
+ // Updates the counters of subchannels in each state when a
+ // subchannel transitions from old_state to new_state.
+ // transient_failure_error is the error that is reported when
+ // new_state is TRANSIENT_FAILURE.
+ void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+ grpc_connectivity_state new_state,
+ grpc_error* transient_failure_error);
+
+ // If this subchannel list is the RR policy's current subchannel
+ // list, updates the RR policy's connectivity state based on the
+ // subchannel list's state counters.
+ void MaybeUpdateRoundRobinConnectivityStateLocked();
+
+ // Updates the RR policy's overall state based on the counters of
+ // subchannels in each state.
+ void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+
+ size_t GetNextReadySubchannelIndexLocked();
+ void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
+
+ private:
+ size_t num_ready_ = 0;
+ size_t num_connecting_ = 0;
+ size_t num_transient_failure_ = 0;
+ grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
+ size_t last_ready_index_ = -1; // Index into list of last pick.
+ };
+
+ void ShutdownLocked() override;
+
+ void StartPickingLocked();
+ bool DoPickLocked(PickState* pick);
+ void DrainPendingPicksLocked();
/** list of subchannels */
- grpc_lb_subchannel_list* subchannel_list_ = nullptr;
+ OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
@@ -98,14 +202,6 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
- /** Index into subchannels for last pick. */
- size_t last_ready_subchannel_index_ = 0;
- /** Latest version of the subchannel list.
- * Subchannel connectivity callbacks will only promote updated subchannel
- * lists if they equal \a latest_pending_subchannel_list. In other words,
- * racing callbacks that reference outdated subchannel lists won't perform any
- * update. */
- grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
@@ -114,15 +210,15 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
"round_robin");
UpdateLocked(*args.args);
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this,
- subchannel_list_->num_subchannels);
+ gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this,
+ subchannel_list_->num_subchannels());
}
grpc_subchannel_index_ref();
}
RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this);
+ gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
@@ -131,68 +227,6 @@ RoundRobin::~RoundRobin() {
grpc_subchannel_index_unref();
}
-/** Returns the index into p->subchannel_list->subchannels of the next
- * subchannel in READY state, or p->subchannel_list->num_subchannels if no
- * subchannel is READY.
- *
- * Note that this function does *not* update p->last_ready_subchannel_index.
- * The caller must do that if it returns a pick. */
-size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
- GPR_ASSERT(subchannel_list_ != nullptr);
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] getting next ready subchannel (out of %" PRIuPTR
- "), "
- "last_ready_subchannel_index=%" PRIuPTR,
- this, subchannel_list_->num_subchannels,
- last_ready_subchannel_index_);
- }
- for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
- const size_t index = (i + last_ready_subchannel_index_ + 1) %
- subchannel_list_->num_subchannels;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
- ": state=%s",
- this, subchannel_list_->subchannels[index].subchannel,
- subchannel_list_, index,
- grpc_connectivity_state_name(
- subchannel_list_->subchannels[index].curr_connectivity_state));
- }
- if (subchannel_list_->subchannels[index].curr_connectivity_state ==
- GRPC_CHANNEL_READY) {
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
- " of subchannel_list %p",
- this, subchannel_list_->subchannels[index].subchannel, index,
- subchannel_list_);
- }
- return index;
- }
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this);
- }
- return subchannel_list_->num_subchannels;
-}
-
-// Sets last_ready_subchannel_index_ to last_ready_index.
-void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels);
- last_ready_subchannel_index_ = last_ready_index;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
- " (SC %p, CSC %p)",
- this, last_ready_index,
- subchannel_list_->subchannels[last_ready_index].subchannel,
- subchannel_list_->subchannels[last_ready_index]
- .connected_subchannel.get());
- }
-}
-
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
@@ -207,7 +241,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
void RoundRobin::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this);
+ gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
shutdown_ = true;
PickState* pick;
@@ -218,16 +252,8 @@ void RoundRobin::ShutdownLocked() {
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_rr_shutdown");
- subchannel_list_ = nullptr;
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown");
- latest_pending_subchannel_list_ = nullptr;
- }
+ subchannel_list_.reset();
+ latest_pending_subchannel_list_.reset();
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
@@ -273,70 +299,59 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
GRPC_ERROR_UNREF(error);
}
-void RoundRobin::SubchannelListRefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- // TODO(roth): We currently track this ref manually. Once the new
- // ClosureRef API is ready and the subchannel_list code has been
- // converted to a C++ API, find a way to hold the RefCountedPtr<>
- // somewhere (maybe in the subchannel_data object) instead of doing
- // this manually.
- auto self = Ref(DEBUG_LOCATION, reason);
- self.release();
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
+void RoundRobin::StartPickingLocked() {
+ started_picking_ = true;
+ subchannel_list_->StartWatchingLocked();
}
-void RoundRobin::SubchannelListUnrefForConnectivityWatch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- Unref(DEBUG_LOCATION, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
+void RoundRobin::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
}
-void RoundRobin::StartPickingLocked() {
- started_picking_ = true;
- for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) {
- if (subchannel_list_->subchannels[i].subchannel != nullptr) {
- SubchannelListRefForConnectivityWatch(subchannel_list_,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list_->subchannels[i]);
+bool RoundRobin::DoPickLocked(PickState* pick) {
+ const size_t next_ready_index =
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
+ /* readily available, report right away */
+ RoundRobinSubchannelData* sd =
+ subchannel_list_->subchannel(next_ready_index);
+ GPR_ASSERT(sd->connected_subchannel() != nullptr);
+ pick->connected_subchannel = sd->connected_subchannel()->Ref();
+ if (pick->user_data != nullptr) {
+ *pick->user_data = sd->user_data();
}
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
+ "index %" PRIuPTR ")",
+ this, sd->subchannel(), pick->connected_subchannel.get(),
+ sd->subchannel_list(), next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index);
+ return true;
}
+ return false;
}
-void RoundRobin::ExitIdleLocked() {
- if (!started_picking_) {
- StartPickingLocked();
+void RoundRobin::DrainPendingPicksLocked() {
+ PickState* pick;
+ while ((pick = pending_picks_)) {
+ pending_picks_ = pick->next;
+ GPR_ASSERT(DoPickLocked(pick));
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
bool RoundRobin::PickLocked(PickState* pick) {
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this,
- shutdown_);
+ gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
}
GPR_ASSERT(!shutdown_);
if (subchannel_list_ != nullptr) {
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
- /* readily available, report right away */
- grpc_lb_subchannel_data* sd =
- &subchannel_list_->subchannels[next_ready_index];
- pick->connected_subchannel = sd->connected_subchannel;
- if (pick->user_data != nullptr) {
- *pick->user_data = sd->user_data;
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
- "index %" PRIuPTR ")",
- this, sd->subchannel, pick->connected_subchannel.get(),
- sd->subchannel_list, next_ready_index);
- }
- /* only advance the last picked pointer if the selection was used */
- UpdateLastReadySubchannelIndexLocked(next_ready_index);
- return true;
- }
+ if (DoPickLocked(pick)) return true;
}
/* no pick currently available. Save for later in list of pending picks */
if (!started_picking_) {
@@ -347,36 +362,62 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
-void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) {
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
- if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(subchannel_list->num_ready > 0);
- --subchannel_list->num_ready;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(subchannel_list->num_transient_failures > 0);
- --subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(subchannel_list->num_idle > 0);
- --subchannel_list->num_idle;
+void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
+ if (num_subchannels() == 0) return;
+ // Check current state of each subchannel synchronously, since any
+ // subchannel already used by some other channel may have a non-IDLE
+ // state.
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_connectivity_state state =
+ subchannel(i)->CheckConnectivityStateLocked(&error);
+ if (state != GRPC_CHANNEL_IDLE) {
+ subchannel(i)->UpdateConnectivityStateLocked(state, error);
+ }
}
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++subchannel_list->num_ready;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++subchannel_list->num_idle;
+ // Now set the LB policy's state based on the subchannels' states.
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+ // Start connectivity watch for each subchannel.
+ for (size_t i = 0; i < num_subchannels(); i++) {
+ if (subchannel(i)->subchannel() != nullptr) {
+ subchannel(i)->StartConnectivityWatchLocked();
+ }
}
}
-/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
- * only if the policy transitions to state TRANSIENT_FAILURE. */
-void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
- grpc_error* error) {
+void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
+ grpc_connectivity_state old_state, grpc_connectivity_state new_state,
+ grpc_error* transient_failure_error) {
+ GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+ if (old_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(num_ready_ > 0);
+ --num_ready_;
+ } else if (old_state == GRPC_CHANNEL_CONNECTING) {
+ GPR_ASSERT(num_connecting_ > 0);
+ --num_connecting_;
+ } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(num_transient_failure_ > 0);
+ --num_transient_failure_;
+ }
+ if (new_state == GRPC_CHANNEL_READY) {
+ ++num_ready_;
+ } else if (new_state == GRPC_CHANNEL_CONNECTING) {
+ ++num_connecting_;
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++num_transient_failure_;
+ }
+ GRPC_ERROR_UNREF(last_transient_failure_error_);
+ last_transient_failure_error_ = transient_failure_error;
+}
+
+// Sets the RR policy's connectivity state based on the current
+// subchannel list.
+void RoundRobin::RoundRobinSubchannelList::
+ MaybeUpdateRoundRobinConnectivityStateLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ // Only set connectivity state if this is the current subchannel list.
+ if (p->subchannel_list_.get() != this) return;
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -391,155 +432,151 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*/
- grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
- if (subchannel_list->num_ready > 0) {
+ if (num_ready_ > 0) {
/* 1) READY */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ } else if (num_connecting_ > 0) {
/* 2) CONNECTING */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
- } else if (subchannel_list->num_transient_failures ==
- subchannel_list->num_subchannels) {
+ } else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error),
+ grpc_connectivity_state_set(&p->state_tracker_,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(last_transient_failure_error_),
"rr_exhausted_subchannels");
}
- GRPC_ERROR_UNREF(error);
}
-void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy);
+void RoundRobin::RoundRobinSubchannelList::
+ UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
+ RoundRobin* p = static_cast<RoundRobin*>(policy());
+ if (num_ready_ > 0) {
+ if (p->subchannel_list_.get() != this) {
+ // Promote this list to p->subchannel_list_.
+ // This list must be p->latest_pending_subchannel_list_, because
+ // any previous update would have been shut down already and
+ // therefore we would not be receiving a notification for them.
+ GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
+ GPR_ASSERT(!shutting_down());
+ if (grpc_lb_round_robin_trace.enabled()) {
+ const size_t old_num_subchannels =
+ p->subchannel_list_ != nullptr
+ ? p->subchannel_list_->num_subchannels()
+ : 0;
+ gpr_log(GPR_INFO,
+ "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
+ ") in favor of %p (size %" PRIuPTR ")",
+ p, p->subchannel_list_.get(), old_num_subchannels, this,
+ num_subchannels());
+ }
+ p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
+ }
+ // Drain pending picks.
+ p->DrainPendingPicksLocked();
+ }
+ // Update the RR policy's connectivity state if needed.
+ MaybeUpdateRoundRobinConnectivityStateLocked();
+}
+
+void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
- GPR_DEBUG,
- "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
- "prev_state=%s new_state=%s p->shutdown=%d "
- "sd->subchannel_list->shutting_down=%d error=%s",
- p, sd->subchannel, sd->subchannel_list,
- grpc_connectivity_state_name(sd->prev_connectivity_state),
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown_, sd->subchannel_list->shutting_down,
- grpc_error_string(error));
- }
- GPR_ASSERT(sd->subchannel != nullptr);
- // If the policy is shutting down, unref and return.
- if (p->shutdown_) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_shutdown");
- return;
+ GPR_INFO,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
+ "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
+ p, subchannel(), subchannel_list(), Index(),
+ subchannel_list()->num_subchannels(),
+ grpc_connectivity_state_name(last_connectivity_state_),
+ grpc_connectivity_state_name(connectivity_state));
+ }
+ subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
+ connectivity_state, error);
+ last_connectivity_state_ = connectivity_state;
+}
+
+void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) {
+ RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
+ GPR_ASSERT(subchannel() != nullptr);
+ // If the new state is TRANSIENT_FAILURE, re-resolve.
+ // Only do this if we've started watching, not at startup time.
+ // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+ // when the subchannel list was created, we'd wind up in a constant
+ // loop of re-resolution.
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, subchannel());
+ }
+ p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
}
- // If the subchannel list is shutting down, stop watching.
- if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
- p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
- "rr_sl_shutdown");
- return;
+ // Update state counters.
+ UpdateConnectivityStateLocked(connectivity_state, error);
+ // Update overall state and renew notification.
+ subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
+ RenewConnectivityWatchLocked();
+}
+
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
+ *
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+size_t
+RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] getting next ready subchannel (out of %" PRIuPTR
+ "), last_ready_index=%" PRIuPTR,
+ policy(), num_subchannels(), last_ready_index_);
}
- // If we're still here, the notification must be for a subchannel in
- // either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
- sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
- // Now that we're inside the combiner, copy the pending connectivity
- // state (which was set by the connectivity state watcher) to
- // curr_connectivity_state, which is what we use inside of the combiner.
- sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
- // subchannel, if any.
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- sd->connected_subchannel.reset();
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
- "Requesting re-resolution",
- p, sd->subchannel);
- }
- p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
- break;
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ const size_t index = (i + last_ready_index_ + 1) % num_subchannels();
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
+ ": state=%s",
+ policy(), subchannel(index)->subchannel(), this, index,
+ grpc_connectivity_state_name(
+ subchannel(index)->connectivity_state()));
}
- case GRPC_CHANNEL_READY: {
- if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel =
- grpc_subchannel_get_connected_subchannel(sd->subchannel);
- }
- if (sd->subchannel_list != p->subchannel_list_) {
- // promote sd->subchannel_list to p->subchannel_list_.
- // sd->subchannel_list must be equal to
- // p->latest_pending_subchannel_list_ because we have already filtered
- // for sds belonging to outdated subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_);
- GPR_ASSERT(!sd->subchannel_list->shutting_down);
- if (grpc_lb_round_robin_trace.enabled()) {
- const size_t num_subchannels =
- p->subchannel_list_ != nullptr
- ? p->subchannel_list_->num_subchannels
- : 0;
- gpr_log(GPR_DEBUG,
- "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
- ") in favor of %p (size %" PRIuPTR ")",
- p, p->subchannel_list_, num_subchannels, sd->subchannel_list,
- num_subchannels);
- }
- if (p->subchannel_list_ != nullptr) {
- // dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
- "sl_phase_out_shutdown");
- }
- p->subchannel_list_ = p->latest_pending_subchannel_list_;
- p->latest_pending_subchannel_list_ = nullptr;
- }
- /* at this point we know there's at least one suitable subchannel. Go
- * ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
- const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked();
- GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels);
- grpc_lb_subchannel_data* selected =
- &p->subchannel_list_->subchannels[next_ready_index];
- if (p->pending_picks_ != nullptr) {
- // if the selected subchannel is going to be used for the pending
- // picks, update the last picked pointer
- p->UpdateLastReadySubchannelIndexLocked(next_ready_index);
- }
- PickState* pick;
- while ((pick = p->pending_picks_)) {
- p->pending_picks_ = pick->next;
- pick->connected_subchannel = selected->connected_subchannel;
- if (pick->user_data != nullptr) {
- *pick->user_data = selected->user_data;
- }
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
- "(subchannel_list %p, index %" PRIuPTR ")",
- p, selected->subchannel, p->subchannel_list_,
- next_ready_index);
- }
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
+ " of subchannel_list %p",
+ policy(), subchannel(index)->subchannel(), index, this);
}
- break;
+ return index;
}
- case GRPC_CHANNEL_SHUTDOWN:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:; // fallthrough
}
- // Update state counters.
- UpdateStateCountersLocked(sd);
- // Only update connectivity based on the selected subchannel list.
- if (sd->subchannel_list == p->subchannel_list_) {
- p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error));
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this);
+ }
+ return num_subchannels();
+}
+
+// Sets last_ready_index_ to last_ready_index.
+void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked(
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < num_subchannels());
+ last_ready_index_ = last_ready_index;
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
+ " (SC %p, CSC %p)",
+ policy(), last_ready_index,
+ subchannel(last_ready_index)->subchannel(),
+ subchannel(last_ready_index)->connected_subchannel());
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
@@ -555,11 +592,12 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
- const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
- if (next_ready_index < subchannel_list_->num_subchannels) {
- grpc_lb_subchannel_data* selected =
- &subchannel_list_->subchannels[next_ready_index];
- selected->connected_subchannel->Ping(on_initiate, on_ack);
+ const size_t next_ready_index =
+ subchannel_list_->GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels()) {
+ RoundRobinSubchannelData* selected =
+ subchannel_list_->subchannel(next_ready_index);
+ selected->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@@ -582,80 +620,37 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
}
return;
}
- grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses",
+ gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, addresses->num_addresses);
}
- grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- this, &grpc_lb_round_robin_trace, addresses, combiner(),
- client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked);
- if (subchannel_list->num_subchannels == 0) {
- grpc_connectivity_state_set(
- &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
- "rr_update_empty");
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
- "sl_shutdown_empty_update");
+ // Replace latest_pending_subchannel_list_.
+ if (latest_pending_subchannel_list_ != nullptr) {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Shutting down previous pending subchannel list %p", this,
+ latest_pending_subchannel_list_.get());
}
- subchannel_list_ = subchannel_list; // empty list
- return;
}
- if (started_picking_) {
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- const grpc_connectivity_state subchannel_state =
- grpc_subchannel_check_connectivity(
- subchannel_list->subchannels[i].subchannel, nullptr);
- // Override the default setting of IDLE for connectivity notification
- // purposes if the subchannel is already in transient failure. Otherwise
- // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE
- // discrepancy, attempt to re-resolve and end up here again.
- // TODO(roth): As part of C++-ifying the subchannel_list API, design a
- // better API for notifying the LB policy of subchannel states, which can
- // be used both for the subchannel's initial state and for subsequent
- // state changes. This will allow us to handle this more generally instead
- // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any
- // pending picks across all READY subchannels rather than sending them all
- // to the first one).
- if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- subchannel_list->subchannels[i].pending_connectivity_state_unsafe =
- subchannel_list->subchannels[i].curr_connectivity_state =
- subchannel_list->subchannels[i].prev_connectivity_state =
- subchannel_state;
- --subchannel_list->num_idle;
- ++subchannel_list->num_transient_failures;
- }
- }
- if (latest_pending_subchannel_list_ != nullptr) {
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "[RR %p] Shutting down latest pending subchannel list %p, "
- "about to be replaced by newer latest %p",
- this, latest_pending_subchannel_list_, subchannel_list);
- }
- grpc_lb_subchannel_list_shutdown_and_unref(
- latest_pending_subchannel_list_, "sl_outdated");
- }
- latest_pending_subchannel_list_ = subchannel_list;
- for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
- /* Watch every new subchannel. A subchannel list becomes active the
- * moment one of its subchannels is READY. At that moment, we swap
- * p->subchannel_list for sd->subchannel_list, provided the subchannel
- * list is still valid (ie, isn't shutting down) */
- SubchannelListRefForConnectivityWatch(subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- &subchannel_list->subchannels[i]);
+ latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
+ this, &grpc_lb_round_robin_trace, addresses, combiner(),
+ client_channel_factory(), args);
+ // If we haven't started picking yet or the new list is empty,
+ // immediately promote the new list to the current list.
+ if (!started_picking_ ||
+ latest_pending_subchannel_list_->num_subchannels() == 0) {
+ if (latest_pending_subchannel_list_->num_subchannels() == 0) {
+ grpc_connectivity_state_set(
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
}
+ subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else {
- // The policy isn't picking yet. Save the update for later, disposing of
- // previous version if any.
- if (subchannel_list_ != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- subchannel_list_, "rr_update_before_started_picking");
- }
- subchannel_list_ = subchannel_list;
+ // If we've started picking, start watching the new list.
+ latest_pending_subchannel_list_->StartWatchingLocked();
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
deleted file mode 100644
index 79cb64c6c6..0000000000
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-
-#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/transport/connectivity_state.h"
-
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason) {
- if (sd->subchannel != nullptr) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): unreffing subchannel",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
- sd->subchannel = nullptr;
- sd->connected_subchannel.reset();
- if (sd->user_data != nullptr) {
- GPR_ASSERT(sd->user_data_vtable != nullptr);
- sd->user_data_vtable->destroy(sd->user_data);
- sd->user_data = nullptr;
- }
- }
-}
-
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): requesting connectivity change "
- "notification (from %s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel,
- grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe));
- }
- sd->connectivity_notification_pending = true;
- grpc_subchannel_notify_on_state_change(
- sd->subchannel, sd->subchannel_list->policy->interested_parties(),
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): stopping connectivity watch",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel);
- }
- GPR_ASSERT(sd->connectivity_notification_pending);
- sd->connectivity_notification_pending = false;
-}
-
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
- const grpc_lb_addresses* addresses, grpc_combiner* combiner,
- grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) {
- grpc_lb_subchannel_list* subchannel_list =
- static_cast<grpc_lb_subchannel_list*>(
- gpr_zalloc(sizeof(*subchannel_list)));
- if (tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
- tracer->name(), p, subchannel_list, addresses->num_addresses);
- }
- subchannel_list->policy = p;
- subchannel_list->tracer = tracer;
- gpr_ref_init(&subchannel_list->refcount, 1);
- subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>(
- gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses));
- // We need to remove the LB addresses in order to be able to compare the
- // subchannel keys of subchannels from a different batch of addresses.
- static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
- // Create a subchannel for each address.
- grpc_subchannel_args sc_args;
- size_t subchannel_index = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- client_channel_factory, &sc_args);
- grpc_channel_args_destroy(new_args);
- if (subchannel == nullptr) {
- // Subchannel could not be created.
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG,
- "[%s %p] could not create subchannel for address uri %s, "
- "ignoring",
- tracer->name(), subchannel_list->policy, address_uri);
- gpr_free(address_uri);
- }
- continue;
- }
- if (tracer->enabled()) {
- char* address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR
- ": Created subchannel %p for address uri %s",
- tracer->name(), p, subchannel_list, subchannel_index, subchannel,
- address_uri);
- gpr_free(address_uri);
- }
- grpc_lb_subchannel_data* sd =
- &subchannel_list->subchannels[subchannel_index++];
- sd->subchannel_list = subchannel_list;
- sd->subchannel = subchannel;
- GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
- connectivity_changed_cb, sd,
- grpc_combiner_scheduler(combiner));
- // We assume that the current state is IDLE. If not, we'll get a
- // callback telling us that.
- sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != nullptr) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- }
- subchannel_list->num_subchannels = subchannel_index;
- subchannel_list->num_idle = subchannel_index;
- return subchannel_list;
-}
-
-static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list);
- }
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy");
- }
- gpr_free(subchannel_list->subchannels);
- gpr_free(subchannel_list);
-}
-
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- gpr_ref_non_zero(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count - 1),
- static_cast<unsigned long>(count), reason);
- }
-}
-
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason) {
- const bool done = gpr_unref(&subchannel_list->refcount);
- if (subchannel_list->tracer->enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, static_cast<unsigned long>(count + 1),
- static_cast<unsigned long>(count), reason);
- }
- if (done) {
- subchannel_list_destroy(subchannel_list);
- }
-}
-
-static void subchannel_data_cancel_connectivity_watch(
- grpc_lb_subchannel_data* sd, const char* reason) {
- if (sd->subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG,
- "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
- " (subchannel %p): canceling connectivity watch (%s)",
- sd->subchannel_list->tracer->name(), sd->subchannel_list->policy,
- sd->subchannel_list,
- static_cast<size_t>(sd - sd->subchannel_list->subchannels),
- sd->subchannel_list->num_subchannels, sd->subchannel, reason);
- }
- grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr,
- &sd->connectivity_changed_closure);
-}
-
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- if (subchannel_list->tracer->enabled()) {
- gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
- subchannel_list->tracer->name(), subchannel_list->policy,
- subchannel_list, reason);
- }
- GPR_ASSERT(!subchannel_list->shutting_down);
- subchannel_list->shutting_down = true;
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- // If there's a pending notification for this subchannel, cancel it;
- // the callback is responsible for unreffing the subchannel.
- // Otherwise, unref the subchannel directly.
- if (sd->connectivity_notification_pending) {
- subchannel_data_cancel_connectivity_watch(sd, reason);
- } else if (sd->subchannel != nullptr) {
- grpc_lb_subchannel_data_unref_subchannel(sd, reason);
- }
- }
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 6889d596ac..7e2046bcdc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -21,116 +21,516 @@
#include <grpc/support/port_platform.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-// TODO(roth): This code is intended to be shared between pick_first and
-// round_robin. However, the interface needs more work to provide clean
-// encapsulation. For example, the structs here have some fields that are
-// only used in one of the two (e.g., the state counters in
-// grpc_lb_subchannel_list and the prev_connectivity_state field in
-// grpc_lb_subchannel_data are only used in round_robin, and the
-// checking_subchannel field in grpc_lb_subchannel_list is only used by
-// pick_first). Also, there is probably some code duplication between the
-// connectivity state notification callback code in both pick_first and
-// round_robin that could be refactored and moved here. In a future PR,
-// need to clean this up.
-
-typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
-
-typedef struct {
- /** backpointer to owning subchannel list */
- grpc_lb_subchannel_list* subchannel_list;
- /** subchannel itself */
- grpc_subchannel* subchannel;
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
- /** Is a connectivity notification pending? */
- bool connectivity_notification_pending;
- /** notification that connectivity has changed on subchannel */
- grpc_closure connectivity_changed_closure;
- /** previous and current connectivity states. Updated by \a
- * \a connectivity_changed_closure based on
- * \a pending_connectivity_state_unsafe. */
- grpc_connectivity_state prev_connectivity_state;
- grpc_connectivity_state curr_connectivity_state;
- /** connectivity state to be updated by
- * grpc_subchannel_notify_on_state_change(), not guarded by
- * the combiner. To be copied to \a curr_connectivity_state by
- * \a connectivity_changed_closure. */
- grpc_connectivity_state pending_connectivity_state_unsafe;
- /** the subchannel's target user data */
- void* user_data;
- /** vtable to operate over \a user_data */
- const grpc_lb_user_data_vtable* user_data_vtable;
-} grpc_lb_subchannel_data;
-
-/// Unrefs the subchannel contained in sd.
-void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
- const char* reason);
-
-/// Starts watching the connectivity state of the subchannel.
-/// The connectivity_changed_cb callback must invoke either
-/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
-/// grpc_lb_subchannel_data_start_connectivity_watch().
-void grpc_lb_subchannel_data_start_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-/// Stops watching the connectivity state of the subchannel.
-void grpc_lb_subchannel_data_stop_connectivity_watch(
- grpc_lb_subchannel_data* sd);
-
-struct grpc_lb_subchannel_list {
- /** backpointer to owning policy */
- grpc_core::LoadBalancingPolicy* policy;
-
- grpc_core::TraceFlag* tracer;
-
- /** all our subchannels */
- size_t num_subchannels;
- grpc_lb_subchannel_data* subchannels;
-
- /** Index into subchannels of the one we're currently checking.
- * Used when connecting to subchannels serially instead of in parallel. */
- // TODO(roth): When we have time, we can probably make this go away
- // and compute the index dynamically by subtracting
- // subchannel_list->subchannels from the subchannel_data pointer.
- size_t checking_subchannel;
-
- /** how many subchannels are in state READY */
- size_t num_ready;
- /** how many subchannels are in state TRANSIENT_FAILURE */
- size_t num_transient_failures;
- /** how many subchannels are in state IDLE */
- size_t num_idle;
-
- /** There will be one ref for each entry in subchannels for which there is a
- * pending connectivity state watcher callback. */
- gpr_refcount refcount;
-
- /** Is this list shutting down? This may be true due to the shutdown of the
- * policy itself or because a newer update has arrived while this one hadn't
- * finished processing. */
- bool shutting_down;
+// Code for maintaining a list of subchannels within an LB policy.
+//
+// To use this, callers must create their own subclasses, like so:
+/*
+
+class MySubchannelList; // Forward declaration.
+
+class MySubchannelData
+ : public SubchannelData<MySubchannelList, MySubchannelData> {
+ public:
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state, grpc_error* error) override {
+ // ...code to handle connectivity changes...
+ }
+};
+
+class MySubchannelList
+ : public SubchannelList<MySubchannelList, MySubchannelData> {
};
-grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+*/
+// All methods with a Locked() suffix must be called from within the
+// client_channel combiner.
+
+namespace grpc_core {
+
+// Stores data for a particular subchannel in a subchannel list.
+// Callers must create a subclass that implements the
+// ProcessConnectivityChangeLocked() method.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelData {
+ public:
+ // Returns a pointer to the subchannel list containing this object.
+ SubchannelListType* subchannel_list() const { return subchannel_list_; }
+
+ // Returns the index into the subchannel list of this object.
+ size_t Index() const {
+ return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
+ subchannel_list_->subchannel(0));
+ }
+
+ // Returns a pointer to the subchannel.
+ grpc_subchannel* subchannel() const { return subchannel_; }
+
+ // Returns the connected subchannel. Will be null if the subchannel
+ // is not connected.
+ ConnectedSubchannel* connected_subchannel() const {
+ return connected_subchannel_.get();
+ }
+
+ // Synchronously checks the subchannel's connectivity state.
+ // Must not be called while there is a connectivity notification
+ // pending (i.e., between calling StartConnectivityWatchLocked() or
+ // RenewConnectivityWatchLocked() and the resulting invocation of
+ // ProcessConnectivityChangeLocked()).
+ grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
+ GPR_ASSERT(!connectivity_notification_pending_);
+ pending_connectivity_state_unsafe_ =
+ grpc_subchannel_check_connectivity(subchannel(), error);
+ UpdateConnectedSubchannelLocked();
+ return pending_connectivity_state_unsafe_;
+ }
+
+ // Unrefs the subchannel. May be used if an individual subchannel is
+ // no longer needed even though the subchannel list as a whole is not
+ // being unreffed.
+ virtual void UnrefSubchannelLocked(const char* reason);
+
+ // Starts watching the connectivity state of the subchannel.
+ // ProcessConnectivityChangeLocked() will be called when the
+ // connectivity state changes.
+ void StartConnectivityWatchLocked();
+
+ // Renews watching the connectivity state of the subchannel.
+ void RenewConnectivityWatchLocked();
+
+ // Stops watching the connectivity state of the subchannel.
+ void StopConnectivityWatchLocked();
+
+ // Cancels watching the connectivity state of the subchannel.
+ // Must be called only while there is a connectivity notification
+ // pending (i.e., between calling StartConnectivityWatchLocked() or
+ // RenewConnectivityWatchLocked() and the resulting invocation of
+ // ProcessConnectivityChangeLocked()).
+ // From within ProcessConnectivityChangeLocked(), use
+ // StopConnectivityWatchLocked() instead.
+ void CancelConnectivityWatchLocked(const char* reason);
+
+ // Cancels any pending connectivity watch and unrefs the subchannel.
+ void ShutdownLocked();
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelData(SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner);
+
+ virtual ~SubchannelData();
+
+ // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
+ // is called, this method will be invoked when the subchannel's connectivity
+ // state changes.
+ // Implementations must invoke either RenewConnectivityWatchLocked() or
+ // StopConnectivityWatchLocked() before returning.
+ virtual void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state,
+ grpc_error* error) GRPC_ABSTRACT;
+
+ private:
+ // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
+ // Returns true if the connectivity state should be reported.
+ bool UpdateConnectedSubchannelLocked();
+
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ // Backpointer to owning subchannel list. Not owned.
+ SubchannelListType* subchannel_list_;
+
+ // The subchannel and connected subchannel.
+ grpc_subchannel* subchannel_;
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+
+ // Notification that connectivity has changed on subchannel.
+ grpc_closure connectivity_changed_closure_;
+ // Is a connectivity notification pending?
+ bool connectivity_notification_pending_ = false;
+ // Connectivity state to be updated by
+ // grpc_subchannel_notify_on_state_change(), not guarded by
+ // the combiner.
+ grpc_connectivity_state pending_connectivity_state_unsafe_;
+};
+
+// A list of subchannels.
+template <typename SubchannelListType, typename SubchannelDataType>
+class SubchannelList
+ : public InternallyRefCountedWithTracing<SubchannelListType> {
+ public:
+ typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
+
+ // The number of subchannels in the list.
+ size_t num_subchannels() const { return subchannels_.size(); }
+
+ // The data for the subchannel at a particular index.
+ SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
+
+ // Returns true if the subchannel list is shutting down.
+ bool shutting_down() const { return shutting_down_; }
+
+ // Accessors.
+ LoadBalancingPolicy* policy() const { return policy_; }
+ TraceFlag* tracer() const { return tracer_; }
+
+ // Note: Caller must ensure that this is invoked inside of the combiner.
+ void Orphan() override {
+ ShutdownLocked();
+ InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION,
+ "shutdown");
+ }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args);
+
+ virtual ~SubchannelList();
+
+ private:
+ // So New() can call our private ctor.
+ template <typename T, typename... Args>
+ friend T* New(Args&&... args);
+
+ // For accessing Ref() and Unref().
+ friend class SubchannelData<SubchannelListType, SubchannelDataType>;
+
+ void ShutdownLocked();
+
+ // Backpointer to owning policy.
+ LoadBalancingPolicy* policy_;
+
+ TraceFlag* tracer_;
+
+ grpc_combiner* combiner_;
+
+ // The list of subchannels.
+ SubchannelVector subchannels_;
+
+ // Is this list shutting down? This may be true due to the shutdown of the
+ // policy itself or because a newer update has arrived while this one hadn't
+ // finished processing.
+ bool shutting_down_ = false;
+};
+
+//
+// implementation -- no user-servicable parts below
+//
+
+//
+// SubchannelData
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
+ SubchannelListType* subchannel_list,
+ const grpc_lb_user_data_vtable* user_data_vtable,
+ const grpc_lb_address& address, grpc_subchannel* subchannel,
+ grpc_combiner* combiner)
+ : subchannel_list_(subchannel_list),
+ subchannel_(subchannel),
+ // We assume that the current state is IDLE. If not, we'll get a
+ // callback telling us that.
+ pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) {
+ GRPC_CLOSURE_INIT(
+ &connectivity_changed_closure_,
+ (&SubchannelData<SubchannelListType,
+ SubchannelDataType>::OnConnectivityChangedLocked),
+ this, grpc_combiner_scheduler(combiner));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
+ UnrefSubchannelLocked("subchannel_data_destroy");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ UnrefSubchannelLocked(const char* reason) {
+ if (subchannel_ != nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): unreffing subchannel",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
+ subchannel_ = nullptr;
+ connected_subchannel_.reset();
+ }
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StartConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): starting watch: requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ GPR_ASSERT(!connectivity_notification_pending_);
+ connectivity_notification_pending_ = true;
+ subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::RenewConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): renewing watch: requesting connectivity change "
+ "notification (from %s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_,
+ grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, subchannel_list_->policy()->interested_parties(),
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType,
+ SubchannelDataType>::StopConnectivityWatchLocked() {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): stopping connectivity watch",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ connectivity_notification_pending_ = false;
+ subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ CancelConnectivityWatchLocked(const char* reason) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): canceling connectivity watch (%s)",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_, reason);
+ }
+ GPR_ASSERT(connectivity_notification_pending_);
+ grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
+ &connectivity_changed_closure_);
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+bool SubchannelData<SubchannelListType,
+ SubchannelDataType>::UpdateConnectedSubchannelLocked() {
+ // If the subchannel is READY, take a ref to the connected subchannel.
+ if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
+ connected_subchannel_ =
+ grpc_subchannel_get_connected_subchannel(subchannel_);
+ // If the subchannel became disconnected between the time that READY
+ // was reported and the time we got here (e.g., between when a
+ // notification callback is scheduled and when it was actually run in
+ // the combiner), then the connected subchannel may have disappeared out
+ // from under us. In that case, we don't actually want to consider the
+ // subchannel to be in state READY. Instead, we use IDLE as the
+ // basis for any future connectivity watch; this is the one state that
+ // the subchannel will never transition back into, so this ensures
+ // that we will get a notification for the next state, even if that state
+ // is READY again (e.g., if the subchannel has transitioned back to
+ // READY before the next watch gets requested).
+ if (connected_subchannel_ == nullptr) {
+ if (subchannel_list_->tracer()->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): state is READY but connected subchannel is "
+ "null; moving to state IDLE",
+ subchannel_list_->tracer()->name(), subchannel_list_->policy(),
+ subchannel_list_, Index(), subchannel_list_->num_subchannels(),
+ subchannel_);
+ }
+ pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
+ return false;
+ }
+ } else {
+ // For any state other than READY, unref the connected subchannel.
+ connected_subchannel_.reset();
+ }
+ return true;
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::
+ OnConnectivityChangedLocked(void* arg, grpc_error* error) {
+ SubchannelData* sd = static_cast<SubchannelData*>(arg);
+ if (sd->subchannel_list_->tracer()->enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): connectivity changed: state=%s, error=%s, "
+ "shutting_down=%d",
+ sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(),
+ sd->subchannel_list_, sd->Index(),
+ sd->subchannel_list_->num_subchannels(), sd->subchannel_,
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
+ grpc_error_string(error), sd->subchannel_list_->shutting_down());
+ }
+ // If shutting down, unref subchannel and stop watching.
+ if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
+ sd->UnrefSubchannelLocked("connectivity_shutdown");
+ sd->StopConnectivityWatchLocked();
+ return;
+ }
+ // Get or release ref to connected subchannel.
+ if (!sd->UpdateConnectedSubchannelLocked()) {
+ // We don't want to report this connectivity state, so renew the watch.
+ sd->RenewConnectivityWatchLocked();
+ return;
+ }
+ // Call the subclass's ProcessConnectivityChangeLocked() method.
+ sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_,
+ GRPC_ERROR_REF(error));
+}
+
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+ // If there's a pending notification for this subchannel, cancel it;
+ // the callback is responsible for unreffing the subchannel.
+ // Otherwise, unref the subchannel directly.
+ if (connectivity_notification_pending_) {
+ CancelConnectivityWatchLocked("shutdown");
+ } else if (subchannel_ != nullptr) {
+ UnrefSubchannelLocked("shutdown");
+ }
+}
+
+//
+// SubchannelList
+//
+
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
+ LoadBalancingPolicy* policy, TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
- const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
+ const grpc_channel_args& args)
+ : InternallyRefCountedWithTracing<SubchannelListType>(tracer),
+ policy_(policy),
+ tracer_(tracer),
+ combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+ tracer_->name(), policy, this, addresses->num_addresses);
+ }
+ subchannels_.reserve(addresses->num_addresses);
+ // We need to remove the LB addresses in order to be able to compare the
+ // subchannel keys of subchannels from a different batch of addresses.
+ static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ // Create a subchannel for each address.
+ grpc_subchannel_args sc_args;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
+ grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
+ client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(new_args);
+ if (subchannel == nullptr) {
+ // Subchannel could not be created.
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "[%s %p] could not create subchannel for address uri %s, "
+ "ignoring",
+ tracer_->name(), policy_, address_uri);
+ gpr_free(address_uri);
+ }
+ continue;
+ }
+ if (tracer_->enabled()) {
+ char* address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "[%s %p] subchannel list %p index %" PRIuPTR
+ ": Created subchannel %p for address uri %s",
+ tracer_->name(), policy_, this, subchannels_.size(), subchannel,
+ address_uri);
+ gpr_free(address_uri);
+ }
+ subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
+ addresses->user_data_vtable,
+ addresses->addresses[i], subchannel, combiner);
+ }
+}
-void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
+ policy_, this);
+ }
+ GRPC_COMBINER_UNREF(combiner_, "subchannel_list");
+}
-void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
- const char* reason);
+template <typename SubchannelListType, typename SubchannelDataType>
+void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
+ tracer_->name(), policy_, this);
+ }
+ GPR_ASSERT(!shutting_down_);
+ shutting_down_ = true;
+ for (size_t i = 0; i < subchannels_.size(); i++) {
+ SubchannelDataType* sd = &subchannels_[i];
+ sd->ShutdownLocked();
+ }
+}
-/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
-/// connectivity state notification callback will ultimately unref it.
-void grpc_lb_subchannel_list_shutdown_and_unref(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc
index 80646a10cc..7c8cba55b7 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc
@@ -66,7 +66,7 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
if (user_data != nullptr) GPR_ASSERT(addresses->user_data_vtable != nullptr);
grpc_lb_address* target = &addresses->addresses[index];
memcpy(target->address.addr, address, address_len);
- target->address.len = address_len;
+ target->address.len = static_cast<socklen_t>(address_len);
target->is_balancer = is_balancer;
target->balancer_name = gpr_strdup(balancer_name);
target->user_data = user_data;
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index b8bbd32072..6440258158 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -21,7 +21,6 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h
index 2283d848bd..2e9bb061ed 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.h
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -24,7 +24,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc
index 374b87e170..1f116bb67d 100644
--- a/src/core/ext/filters/client_channel/method_params.cc
+++ b/src/core/ext/filters/client_channel/method_params.cc
@@ -26,7 +26,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/method_params.h"
-#include "src/core/ext/filters/client_channel/status_util.h"
+#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h
index 48ece29867..a31d360f17 100644
--- a/src/core/ext/filters/client_channel/method_params.h
+++ b/src/core/ext/filters/client_channel/method_params.h
@@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
-#include "src/core/ext/filters/client_channel/status_util.h"
+#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis
@@ -60,6 +60,10 @@ class ClientChannelMethodParams : public RefCounted<ClientChannelMethodParams> {
template <typename T, typename... Args>
friend T* grpc_core::New(Args&&... args);
+ // So Delete() can call our private dtor.
+ template <typename T>
+ friend void grpc_core::Delete(T*);
+
ClientChannelMethodParams() {}
virtual ~ClientChannelMethodParams() {}
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index e78dc99e0b..b3900114ad 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -20,6 +20,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <stdio.h>
#include <string.h>
@@ -49,7 +50,7 @@ bool grpc_parse_unix(const grpc_uri* uri,
if (path_len == maxlen) return false;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, uri->path);
- resolved_addr->len = sizeof(*un);
+ resolved_addr->len = static_cast<socklen_t>(sizeof(*un));
return true;
}
@@ -71,10 +72,10 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr->addr);
- in->sin_family = AF_INET;
- if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
+ grpc_sockaddr_in* in = reinterpret_cast<grpc_sockaddr_in*>(addr->addr);
+ in->sin_family = GRPC_AF_INET;
+ if (grpc_inet_pton(GRPC_AF_INET, host, &in->sin_addr) == 0) {
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done;
}
@@ -88,7 +89,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
goto done;
}
- in->sin_port = htons(static_cast<uint16_t>(port_num));
+ in->sin_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
@@ -117,19 +118,20 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* in6 = reinterpret_cast<struct sockaddr_in6*>(addr->addr);
- in6->sin6_family = AF_INET6;
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
+ grpc_sockaddr_in6* in6 = reinterpret_cast<grpc_sockaddr_in6*>(addr->addr);
+ in6->sin6_family = GRPC_AF_INET6;
// Handle the RFC6874 syntax for IPv6 zone identifiers.
char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host)));
if (host_end != nullptr) {
GPR_ASSERT(host_end >= host);
- char host_without_scope[INET6_ADDRSTRLEN];
+ char host_without_scope[GRPC_INET6_ADDRSTRLEN];
size_t host_without_scope_len = static_cast<size_t>(host_end - host);
uint32_t sin6_scope_id = 0;
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
- if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
+ 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
goto done;
}
@@ -142,7 +144,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
// Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
} else {
- if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
goto done;
}
@@ -157,7 +159,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
goto done;
}
- in6->sin6_port = htons(static_cast<uint16_t>(port_num));
+ in6->sin6_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index 1685a6c803..02380314dd 100644
--- a/src/core/ext/filters/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -53,8 +53,12 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
/// Requests a callback when a new result becomes available.
/// When the new result is available, sets \a *result to the new result
/// and schedules \a on_complete for execution.
+ /// Upon transient failure, sets \a *result to nullptr and schedules
+ /// \a on_complete with no error.
/// If resolution is fatally broken, sets \a *result to nullptr and
/// schedules \a on_complete with an error.
+ /// TODO(roth): When we have time, improve the way this API represents
+ /// transient failure vs. shutdown.
///
/// Note that the client channel will almost always have a request
/// to \a NextLocked() pending. When it gets the callback, it will
@@ -101,6 +105,10 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
GRPC_ABSTRACT_BASE_CLASS
protected:
+ // So Delete() can access our protected dtor.
+ template <typename T>
+ friend void Delete(T*);
+
/// Does NOT take ownership of the reference to \a combiner.
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a RefCountedPtr<>, so that we always take
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index aa93e5d8de..c3c62b60bf 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -28,6 +28,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include <address_sorting/address_sorting.h>
+
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
@@ -133,6 +135,7 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
if (path[0] == '/') ++path;
name_to_resolve_ = gpr_strdup(path);
// Get DNS server from URI authority.
+ dns_server_ = nullptr;
if (0 != strcmp(args.uri->authority, "")) {
dns_server_ = gpr_strdup(args.uri->authority);
}
@@ -360,6 +363,15 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
}
void AresDnsResolver::MaybeStartResolvingLocked() {
+ // If there is an existing timer, the time it fires is the earliest time we
+ // can start the next resolution.
+ if (have_next_resolution_timer_) {
+ // TODO(dgq): remove the following two lines once Pick First stops
+ // discarding subchannels after selecting.
+ ++resolved_version_;
+ MaybeFinishNextLocked();
+ return;
+ }
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@@ -372,17 +384,15 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution);
- if (!have_next_resolution_timer_) {
- have_next_resolution_timer_ = true;
- // TODO(roth): We currently deal with this ref manually. Once the
- // new closure API is done, find a way to track this ref with the timer
- // callback as part of the type system.
- RefCountedPtr<Resolver> self =
- Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
- self.release();
- grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
- &on_next_resolution_);
- }
+ have_next_resolution_timer_ = true;
+ // TODO(roth): We currently deal with this ref manually. Once the
+ // new closure API is done, find a way to track this ref with the timer
+ // callback as part of the type system.
+ RefCountedPtr<Resolver> self =
+ Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
+ self.release();
+ grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
+ &on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
@@ -394,6 +404,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
}
void AresDnsResolver::StartResolvingLocked() {
+ gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
@@ -440,17 +451,32 @@ class AresDnsResolverFactory : public ResolverFactory {
} // namespace grpc_core
+extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
+static grpc_address_resolver_vtable* default_resolver;
+
+static grpc_error* blocking_resolve_address_ares(
+ const char* name, const char* default_port,
+ grpc_resolved_addresses** addresses) {
+ return default_resolver->blocking_resolve_address(name, default_port,
+ addresses);
+}
+
+static grpc_address_resolver_vtable ares_resolver = {
+ grpc_resolve_address_ares, blocking_resolve_address_ares};
+
void grpc_resolver_dns_ares_init() {
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
/* TODO(zyc): Turn on c-ares based resolver by default after the address
sorter and the CNAME support are added. */
if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) {
+ address_sorting_init();
grpc_error* error = grpc_ares_init();
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
return;
}
- grpc_resolve_address = grpc_resolve_address_ares;
+ default_resolver = grpc_resolve_address_impl;
+ grpc_set_resolver_impl(&ares_resolver);
grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
grpc_core::UniquePtr<grpc_core::ResolverFactory>(
grpc_core::New<grpc_core::AresDnsResolverFactory>()));
@@ -461,6 +487,7 @@ void grpc_resolver_dns_ares_init() {
void grpc_resolver_dns_ares_shutdown() {
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) {
+ address_sorting_shutdown();
grpc_ares_cleanup();
}
gpr_free(resolver_env);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 0bc13e35f4..6239549534 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h>
#include <ares.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 71b06eb87e..e86ab5a37e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -33,6 +33,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include <address_sorting/address_sorting.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/gpr/host_port.h"
@@ -46,6 +47,9 @@
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
+grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
+ "cares_address_sorting");
+
struct grpc_ares_request {
/** indicates the DNS server to use, if specified */
struct ares_addr_port_node dns_server_addr;
@@ -96,11 +100,63 @@ static void grpc_ares_request_ref(grpc_ares_request* r) {
gpr_ref(&r->pending_queries);
}
+static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
+ const char* input_output_str) {
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
+ char* addr_str;
+ if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address,
+ true)) {
+ gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s",
+ input_output_str, i, addr_str);
+ gpr_free(addr_str);
+ } else {
+ gpr_log(GPR_DEBUG,
+ "c-ares address sorting: %s[%" PRIuPTR "]=<unprintable>",
+ input_output_str, i);
+ }
+ }
+}
+
+void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) {
+ if (grpc_trace_cares_address_sorting.enabled()) {
+ log_address_sorting_list(lb_addrs, "input");
+ }
+ address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc(
+ sizeof(address_sorting_sortable) * lb_addrs->num_addresses);
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
+ sortables[i].user_data = &lb_addrs->addresses[i];
+ memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr,
+ lb_addrs->addresses[i].address.len);
+ sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len;
+ }
+ address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses);
+ grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc(
+ sizeof(grpc_lb_address) * lb_addrs->num_addresses);
+ for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
+ sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data;
+ }
+ gpr_free(sortables);
+ gpr_free(lb_addrs->addresses);
+ lb_addrs->addresses = sorted_lb_addrs;
+ if (grpc_trace_cares_address_sorting.enabled()) {
+ log_address_sorting_list(lb_addrs, "output");
+ }
+}
+
+/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */
+void grpc_cares_wrapper_test_only_address_sorting_sort(
+ grpc_lb_addresses* lb_addrs) {
+ grpc_cares_wrapper_address_sorting_sort(lb_addrs);
+}
+
static void grpc_ares_request_unref(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the
request */
if (gpr_unref(&r->pending_queries)) {
- /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */
+ grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out);
+ if (lb_addrs != nullptr) {
+ grpc_cares_wrapper_address_sorting_sort(lb_addrs);
+ }
GRPC_CLOSURE_SCHED(r->on_done, r->error);
gpr_mu_destroy(&r->mu);
grpc_ares_ev_driver_destroy(r->ev_driver);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index bda9cd1729..2d84a038d6 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -22,11 +22,12 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
+extern grpc_core::TraceFlag grpc_trace_cares_address_sorting;
+
typedef struct grpc_ares_request grpc_ares_request;
/* Asynchronously resolve \a name. Use \a default_port if a port isn't
@@ -65,5 +66,9 @@ grpc_error* grpc_ares_init(void);
it has been called the same number of times as grpc_ares_init(). */
void grpc_ares_cleanup(void);
+/* Exposed only for testing */
+void grpc_cares_wrapper_test_only_address_sorting_sort(
+ grpc_lb_addresses* lb_addrs);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \
*/
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index fbab136421..e7842a7951 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -236,6 +236,15 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
}
void NativeDnsResolver::MaybeStartResolvingLocked() {
+ // If there is an existing timer, the time it fires is the earliest time we
+ // can start the next resolution.
+ if (have_next_resolution_timer_) {
+ // TODO(dgq): remove the following two lines once Pick First stops
+ // discarding subchannels after selecting.
+ ++resolved_version_;
+ MaybeFinishNextLocked();
+ return;
+ }
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@@ -248,17 +257,15 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
"In cooldown from last resolution (from %" PRIdPTR
" ms ago). Will resolve again in %" PRIdPTR " ms",
last_resolution_ago, ms_until_next_resolution);
- if (!have_next_resolution_timer_) {
- have_next_resolution_timer_ = true;
- // TODO(roth): We currently deal with this ref manually. Once the
- // new closure API is done, find a way to track this ref with the timer
- // callback as part of the type system.
- RefCountedPtr<Resolver> self =
- Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
- self.release();
- grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
- &on_next_resolution_);
- }
+ have_next_resolution_timer_ = true;
+ // TODO(roth): We currently deal with this ref manually. Once the
+ // new closure API is done, find a way to track this ref with the timer
+ // callback as part of the type system.
+ RefCountedPtr<Resolver> self =
+ Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown");
+ self.release();
+ grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
+ &on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
@@ -270,6 +277,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
}
void NativeDnsResolver::StartResolvingLocked() {
+ gpr_log(GPR_DEBUG, "Start resolving.");
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
index 4d8958f519..99a33f2277 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
@@ -82,6 +82,8 @@ class FakeResolver : public Resolver {
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
+ // if true, return failure
+ bool return_failure_ = false;
};
FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
@@ -121,12 +123,16 @@ void FakeResolver::RequestReresolutionLocked() {
}
void FakeResolver::MaybeFinishNextLocked() {
- if (next_completion_ != nullptr && next_results_ != nullptr) {
- *target_result_ = grpc_channel_args_union(next_results_, channel_args_);
+ if (next_completion_ != nullptr &&
+ (next_results_ != nullptr || return_failure_)) {
+ *target_result_ =
+ return_failure_ ? nullptr
+ : grpc_channel_args_union(next_results_, channel_args_);
grpc_channel_args_destroy(next_results_);
next_results_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
+ return_failure_ = false;
}
}
@@ -197,6 +203,26 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
GRPC_ERROR_NONE);
}
+void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
+ grpc_error* error) {
+ SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
+ FakeResolver* resolver = closure_arg->generator->resolver_;
+ resolver->return_failure_ = true;
+ resolver->MaybeFinishNextLocked();
+ Delete(closure_arg);
+}
+
+void FakeResolverResponseGenerator::SetFailure() {
+ GPR_ASSERT(resolver_ != nullptr);
+ SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
+ closure_arg->generator = this;
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
+ closure_arg,
+ grpc_combiner_scheduler(resolver_->combiner())),
+ GRPC_ERROR_NONE);
+}
+
namespace {
static void* response_generator_arg_copy(void* p) {
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index 858f35851d..e5175f9b7b 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -56,6 +56,10 @@ class FakeResolverResponseGenerator
// resolver will return the last value set via \a SetResponse().
void SetReresolutionResponse(grpc_channel_args* response);
+ // Tells the resolver to return a transient failure (signalled by
+ // returning a null result with no error).
+ void SetFailure();
+
// Returns a channel arg containing \a generator.
static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator);
@@ -68,6 +72,7 @@ class FakeResolverResponseGenerator
static void SetResponseLocked(void* arg, grpc_error* error);
static void SetReresolutionResponseLocked(void* arg, grpc_error* error);
+ static void SetFailureLocked(void* arg, grpc_error* error);
FakeResolver* resolver_ = nullptr; // Do not own.
};
diff --git a/src/core/ext/filters/client_channel/retry_throttle.cc b/src/core/ext/filters/client_channel/retry_throttle.cc
index 45de6667c8..bdeb7e4cac 100644
--- a/src/core/ext/filters/client_channel/retry_throttle.cc
+++ b/src/core/ext/filters/client_channel/retry_throttle.cc
@@ -30,184 +30,162 @@
#include "src/core/lib/avl/avl.h"
+namespace grpc_core {
+namespace internal {
+
//
-// server_retry_throttle_data
+// ServerRetryThrottleData
//
-struct grpc_server_retry_throttle_data {
- gpr_refcount refs;
- int max_milli_tokens;
- int milli_token_ratio;
- gpr_atm milli_tokens;
- // A pointer to the replacement for this grpc_server_retry_throttle_data
- // entry. If non-nullptr, then this entry is stale and must not be used.
- // We hold a reference to the replacement.
- gpr_atm replacement;
-};
-
-static void get_replacement_throttle_data_if_needed(
- grpc_server_retry_throttle_data** throttle_data) {
+ServerRetryThrottleData::ServerRetryThrottleData(
+ intptr_t max_milli_tokens, intptr_t milli_token_ratio,
+ ServerRetryThrottleData* old_throttle_data)
+ : max_milli_tokens_(max_milli_tokens),
+ milli_token_ratio_(milli_token_ratio) {
+ intptr_t initial_milli_tokens = max_milli_tokens;
+ // If there was a pre-existing entry for this server name, initialize
+ // the token count by scaling proportionately to the old data. This
+ // ensures that if we're already throttling retries on the old scale,
+ // we will start out doing the same thing on the new one.
+ if (old_throttle_data != nullptr) {
+ double token_fraction =
+ static_cast<intptr_t>(
+ gpr_atm_acq_load(&old_throttle_data->milli_tokens_)) /
+ static_cast<double>(old_throttle_data->max_milli_tokens_);
+ initial_milli_tokens =
+ static_cast<intptr_t>(token_fraction * max_milli_tokens);
+ }
+ gpr_atm_rel_store(&milli_tokens_, static_cast<gpr_atm>(initial_milli_tokens));
+ // If there was a pre-existing entry, mark it as stale and give it a
+ // pointer to the new entry, which is its replacement.
+ if (old_throttle_data != nullptr) {
+ Ref().release(); // Ref held by pre-existing entry.
+ gpr_atm_rel_store(&old_throttle_data->replacement_,
+ reinterpret_cast<gpr_atm>(this));
+ }
+}
+
+ServerRetryThrottleData::~ServerRetryThrottleData() {
+ ServerRetryThrottleData* replacement =
+ reinterpret_cast<ServerRetryThrottleData*>(
+ gpr_atm_acq_load(&replacement_));
+ if (replacement != nullptr) {
+ replacement->Unref();
+ }
+}
+
+void ServerRetryThrottleData::GetReplacementThrottleDataIfNeeded(
+ ServerRetryThrottleData** throttle_data) {
while (true) {
- grpc_server_retry_throttle_data* new_throttle_data =
- (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
- &(*throttle_data)->replacement);
+ ServerRetryThrottleData* new_throttle_data =
+ reinterpret_cast<ServerRetryThrottleData*>(
+ gpr_atm_acq_load(&(*throttle_data)->replacement_));
if (new_throttle_data == nullptr) return;
*throttle_data = new_throttle_data;
}
}
-bool grpc_server_retry_throttle_data_record_failure(
- grpc_server_retry_throttle_data* throttle_data) {
- if (throttle_data == nullptr) return true;
+bool ServerRetryThrottleData::RecordFailure() {
// First, check if we are stale and need to be replaced.
- get_replacement_throttle_data_if_needed(&throttle_data);
+ ServerRetryThrottleData* throttle_data = this;
+ GetReplacementThrottleDataIfNeeded(&throttle_data);
// We decrement milli_tokens by 1000 (1 token) for each failure.
- const int new_value = static_cast<int>(gpr_atm_no_barrier_clamped_add(
- &throttle_data->milli_tokens, static_cast<gpr_atm>(-1000),
- static_cast<gpr_atm>(0),
- static_cast<gpr_atm>(throttle_data->max_milli_tokens)));
+ const intptr_t new_value =
+ static_cast<intptr_t>(gpr_atm_no_barrier_clamped_add(
+ &throttle_data->milli_tokens_, static_cast<gpr_atm>(-1000),
+ static_cast<gpr_atm>(0),
+ static_cast<gpr_atm>(throttle_data->max_milli_tokens_)));
// Retries are allowed as long as the new value is above the threshold
// (max_milli_tokens / 2).
- return new_value > throttle_data->max_milli_tokens / 2;
+ return new_value > throttle_data->max_milli_tokens_ / 2;
}
-void grpc_server_retry_throttle_data_record_success(
- grpc_server_retry_throttle_data* throttle_data) {
- if (throttle_data == nullptr) return;
+void ServerRetryThrottleData::RecordSuccess() {
// First, check if we are stale and need to be replaced.
- get_replacement_throttle_data_if_needed(&throttle_data);
+ ServerRetryThrottleData* throttle_data = this;
+ GetReplacementThrottleDataIfNeeded(&throttle_data);
// We increment milli_tokens by milli_token_ratio for each success.
gpr_atm_no_barrier_clamped_add(
- &throttle_data->milli_tokens,
- static_cast<gpr_atm>(throttle_data->milli_token_ratio),
+ &throttle_data->milli_tokens_,
+ static_cast<gpr_atm>(throttle_data->milli_token_ratio_),
static_cast<gpr_atm>(0),
- static_cast<gpr_atm>(throttle_data->max_milli_tokens));
-}
-
-grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
- grpc_server_retry_throttle_data* throttle_data) {
- gpr_ref(&throttle_data->refs);
- return throttle_data;
-}
-
-void grpc_server_retry_throttle_data_unref(
- grpc_server_retry_throttle_data* throttle_data) {
- if (gpr_unref(&throttle_data->refs)) {
- grpc_server_retry_throttle_data* replacement =
- (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
- &throttle_data->replacement);
- if (replacement != nullptr) {
- grpc_server_retry_throttle_data_unref(replacement);
- }
- gpr_free(throttle_data);
- }
-}
-
-static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
- int max_milli_tokens, int milli_token_ratio,
- grpc_server_retry_throttle_data* old_throttle_data) {
- grpc_server_retry_throttle_data* throttle_data =
- static_cast<grpc_server_retry_throttle_data*>(
- gpr_malloc(sizeof(*throttle_data)));
- memset(throttle_data, 0, sizeof(*throttle_data));
- gpr_ref_init(&throttle_data->refs, 1);
- throttle_data->max_milli_tokens = max_milli_tokens;
- throttle_data->milli_token_ratio = milli_token_ratio;
- int initial_milli_tokens = max_milli_tokens;
- // If there was a pre-existing entry for this server name, initialize
- // the token count by scaling proportionately to the old data. This
- // ensures that if we're already throttling retries on the old scale,
- // we will start out doing the same thing on the new one.
- if (old_throttle_data != nullptr) {
- double token_fraction =
- static_cast<int>(gpr_atm_acq_load(&old_throttle_data->milli_tokens)) /
- static_cast<double>(old_throttle_data->max_milli_tokens);
- initial_milli_tokens = static_cast<int>(token_fraction * max_milli_tokens);
- }
- gpr_atm_rel_store(&throttle_data->milli_tokens,
- (gpr_atm)initial_milli_tokens);
- // If there was a pre-existing entry, mark it as stale and give it a
- // pointer to the new entry, which is its replacement.
- if (old_throttle_data != nullptr) {
- grpc_server_retry_throttle_data_ref(throttle_data);
- gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data);
- }
- return throttle_data;
+ static_cast<gpr_atm>(throttle_data->max_milli_tokens_));
}
//
// avl vtable for string -> server_retry_throttle_data map
//
-static void* copy_server_name(void* key, void* unused) {
+namespace {
+
+void* copy_server_name(void* key, void* unused) {
return gpr_strdup(static_cast<const char*>(key));
}
-static long compare_server_name(void* key1, void* key2, void* unused) {
+long compare_server_name(void* key1, void* key2, void* unused) {
return strcmp(static_cast<const char*>(key1), static_cast<const char*>(key2));
}
-static void destroy_server_retry_throttle_data(void* value, void* unused) {
- grpc_server_retry_throttle_data* throttle_data =
- static_cast<grpc_server_retry_throttle_data*>(value);
- grpc_server_retry_throttle_data_unref(throttle_data);
+void destroy_server_retry_throttle_data(void* value, void* unused) {
+ ServerRetryThrottleData* throttle_data =
+ static_cast<ServerRetryThrottleData*>(value);
+ throttle_data->Unref();
}
-static void* copy_server_retry_throttle_data(void* value, void* unused) {
- grpc_server_retry_throttle_data* throttle_data =
- static_cast<grpc_server_retry_throttle_data*>(value);
- return grpc_server_retry_throttle_data_ref(throttle_data);
+void* copy_server_retry_throttle_data(void* value, void* unused) {
+ ServerRetryThrottleData* throttle_data =
+ static_cast<ServerRetryThrottleData*>(value);
+ return throttle_data->Ref().release();
}
-static void destroy_server_name(void* key, void* unused) { gpr_free(key); }
+void destroy_server_name(void* key, void* unused) { gpr_free(key); }
-static const grpc_avl_vtable avl_vtable = {
+const grpc_avl_vtable avl_vtable = {
destroy_server_name, copy_server_name, compare_server_name,
destroy_server_retry_throttle_data, copy_server_retry_throttle_data};
+} // namespace
+
//
-// server_retry_throttle_map
+// ServerRetryThrottleMap
//
static gpr_mu g_mu;
static grpc_avl g_avl;
-void grpc_retry_throttle_map_init() {
+void ServerRetryThrottleMap::Init() {
gpr_mu_init(&g_mu);
g_avl = grpc_avl_create(&avl_vtable);
}
-void grpc_retry_throttle_map_shutdown() {
+void ServerRetryThrottleMap::Shutdown() {
gpr_mu_destroy(&g_mu);
grpc_avl_unref(g_avl, nullptr);
}
-grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
- const char* server_name, int max_milli_tokens, int milli_token_ratio) {
+RefCountedPtr<ServerRetryThrottleData> ServerRetryThrottleMap::GetDataForServer(
+ const char* server_name, intptr_t max_milli_tokens,
+ intptr_t milli_token_ratio) {
+ RefCountedPtr<ServerRetryThrottleData> result;
gpr_mu_lock(&g_mu);
- grpc_server_retry_throttle_data* throttle_data =
- static_cast<grpc_server_retry_throttle_data*>(
+ ServerRetryThrottleData* throttle_data =
+ static_cast<ServerRetryThrottleData*>(
grpc_avl_get(g_avl, const_cast<char*>(server_name), nullptr));
- if (throttle_data == nullptr) {
- // Entry not found. Create a new one.
- throttle_data = grpc_server_retry_throttle_data_create(
- max_milli_tokens, milli_token_ratio, nullptr);
- g_avl = grpc_avl_add(g_avl, const_cast<char*>(server_name), throttle_data,
- nullptr);
+ if (throttle_data == nullptr ||
+ throttle_data->max_milli_tokens() != max_milli_tokens ||
+ throttle_data->milli_token_ratio() != milli_token_ratio) {
+ // Entry not found, or found with old parameters. Create a new one.
+ result = MakeRefCounted<ServerRetryThrottleData>(
+ max_milli_tokens, milli_token_ratio, throttle_data);
+ g_avl = grpc_avl_add(g_avl, gpr_strdup(server_name),
+ result->Ref().release(), nullptr);
} else {
- if (throttle_data->max_milli_tokens != max_milli_tokens ||
- throttle_data->milli_token_ratio != milli_token_ratio) {
- // Entry found but with old parameters. Create a new one based on
- // the original one.
- throttle_data = grpc_server_retry_throttle_data_create(
- max_milli_tokens, milli_token_ratio, throttle_data);
- g_avl = grpc_avl_add(g_avl, const_cast<char*>(server_name), throttle_data,
- nullptr);
- } else {
- // Entry found. Increase refcount.
- grpc_server_retry_throttle_data_ref(throttle_data);
- }
+ // Entry found. Return a new ref to it.
+ result = throttle_data->Ref();
}
gpr_mu_unlock(&g_mu);
- return throttle_data;
+ return result;
}
+
+} // namespace internal
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/retry_throttle.h b/src/core/ext/filters/client_channel/retry_throttle.h
index 0505fc27f2..fddafcd903 100644
--- a/src/core/ext/filters/client_channel/retry_throttle.h
+++ b/src/core/ext/filters/client_channel/retry_throttle.h
@@ -21,32 +21,61 @@
#include <grpc/support/port_platform.h>
-#include <stdbool.h>
+#include "src/core/lib/gprpp/ref_counted.h"
+
+namespace grpc_core {
+namespace internal {
/// Tracks retry throttling data for an individual server name.
-typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data;
-
-/// Records a failure. Returns true if it's okay to send a retry.
-bool grpc_server_retry_throttle_data_record_failure(
- grpc_server_retry_throttle_data* throttle_data);
-/// Records a success.
-void grpc_server_retry_throttle_data_record_success(
- grpc_server_retry_throttle_data* throttle_data);
-
-grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
- grpc_server_retry_throttle_data* throttle_data);
-void grpc_server_retry_throttle_data_unref(
- grpc_server_retry_throttle_data* throttle_data);
-
-/// Initializes global map of failure data for each server name.
-void grpc_retry_throttle_map_init();
-/// Shuts down global map of failure data for each server name.
-void grpc_retry_throttle_map_shutdown();
-
-/// Returns a reference to the failure data for \a server_name, creating
-/// a new entry if needed.
-/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref().
-grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
- const char* server_name, int max_milli_tokens, int milli_token_ratio);
+class ServerRetryThrottleData : public RefCounted<ServerRetryThrottleData> {
+ public:
+ ServerRetryThrottleData(intptr_t max_milli_tokens, intptr_t milli_token_ratio,
+ ServerRetryThrottleData* old_throttle_data);
+
+ /// Records a failure. Returns true if it's okay to send a retry.
+ bool RecordFailure();
+
+ /// Records a success.
+ void RecordSuccess();
+
+ intptr_t max_milli_tokens() const { return max_milli_tokens_; }
+ intptr_t milli_token_ratio() const { return milli_token_ratio_; }
+
+ private:
+ // So Delete() can call our private dtor.
+ template <typename T>
+ friend void grpc_core::Delete(T*);
+
+ ~ServerRetryThrottleData();
+
+ void GetReplacementThrottleDataIfNeeded(
+ ServerRetryThrottleData** throttle_data);
+
+ const intptr_t max_milli_tokens_;
+ const intptr_t milli_token_ratio_;
+ gpr_atm milli_tokens_;
+ // A pointer to the replacement for this ServerRetryThrottleData entry.
+ // If non-nullptr, then this entry is stale and must not be used.
+ // We hold a reference to the replacement.
+ gpr_atm replacement_ = 0;
+};
+
+/// Global map of server name to retry throttle data.
+class ServerRetryThrottleMap {
+ public:
+ /// Initializes global map of failure data for each server name.
+ static void Init();
+ /// Shuts down global map of failure data for each server name.
+ static void Shutdown();
+
+ /// Returns the failure data for \a server_name, creating a new entry if
+ /// needed.
+ static RefCountedPtr<ServerRetryThrottleData> GetDataForServer(
+ const char* server_name, intptr_t max_milli_tokens,
+ intptr_t milli_token_ratio);
+};
+
+} // namespace internal
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_THROTTLE_H */
diff --git a/src/core/ext/filters/client_channel/status_util.cc b/src/core/ext/filters/client_channel/status_util.cc
deleted file mode 100644
index 11f732ab44..0000000000
--- a/src/core/ext/filters/client_channel/status_util.cc
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/ext/filters/client_channel/status_util.h"
-
-#include "src/core/lib/gpr/useful.h"
-
-typedef struct {
- const char* str;
- grpc_status_code status;
-} status_string_entry;
-
-static const status_string_entry g_status_string_entries[] = {
- {"OK", GRPC_STATUS_OK},
- {"CANCELLED", GRPC_STATUS_CANCELLED},
- {"UNKNOWN", GRPC_STATUS_UNKNOWN},
- {"INVALID_ARGUMENT", GRPC_STATUS_INVALID_ARGUMENT},
- {"DEADLINE_EXCEEDED", GRPC_STATUS_DEADLINE_EXCEEDED},
- {"NOT_FOUND", GRPC_STATUS_NOT_FOUND},
- {"ALREADY_EXISTS", GRPC_STATUS_ALREADY_EXISTS},
- {"PERMISSION_DENIED", GRPC_STATUS_PERMISSION_DENIED},
- {"UNAUTHENTICATED", GRPC_STATUS_UNAUTHENTICATED},
- {"RESOURCE_EXHAUSTED", GRPC_STATUS_RESOURCE_EXHAUSTED},
- {"FAILED_PRECONDITION", GRPC_STATUS_FAILED_PRECONDITION},
- {"ABORTED", GRPC_STATUS_ABORTED},
- {"OUT_OF_RANGE", GRPC_STATUS_OUT_OF_RANGE},
- {"UNIMPLEMENTED", GRPC_STATUS_UNIMPLEMENTED},
- {"INTERNAL", GRPC_STATUS_INTERNAL},
- {"UNAVAILABLE", GRPC_STATUS_UNAVAILABLE},
- {"DATA_LOSS", GRPC_STATUS_DATA_LOSS},
-};
-
-bool grpc_status_code_from_string(const char* status_str,
- grpc_status_code* status) {
- for (size_t i = 0; i < GPR_ARRAY_SIZE(g_status_string_entries); ++i) {
- if (strcmp(status_str, g_status_string_entries[i].str) == 0) {
- *status = g_status_string_entries[i].status;
- return true;
- }
- }
- return false;
-}
-
-const char* grpc_status_code_to_string(grpc_status_code status) {
- switch (status) {
- case GRPC_STATUS_OK:
- return "OK";
- case GRPC_STATUS_CANCELLED:
- return "CANCELLED";
- case GRPC_STATUS_UNKNOWN:
- return "UNKNOWN";
- case GRPC_STATUS_INVALID_ARGUMENT:
- return "INVALID_ARGUMENT";
- case GRPC_STATUS_DEADLINE_EXCEEDED:
- return "DEADLINE_EXCEEDED";
- case GRPC_STATUS_NOT_FOUND:
- return "NOT_FOUND";
- case GRPC_STATUS_ALREADY_EXISTS:
- return "ALREADY_EXISTS";
- case GRPC_STATUS_PERMISSION_DENIED:
- return "PERMISSION_DENIED";
- case GRPC_STATUS_UNAUTHENTICATED:
- return "UNAUTHENTICATED";
- case GRPC_STATUS_RESOURCE_EXHAUSTED:
- return "RESOURCE_EXHAUSTED";
- case GRPC_STATUS_FAILED_PRECONDITION:
- return "FAILED_PRECONDITION";
- case GRPC_STATUS_ABORTED:
- return "ABORTED";
- case GRPC_STATUS_OUT_OF_RANGE:
- return "OUT_OF_RANGE";
- case GRPC_STATUS_UNIMPLEMENTED:
- return "UNIMPLEMENTED";
- case GRPC_STATUS_INTERNAL:
- return "INTERNAL";
- case GRPC_STATUS_UNAVAILABLE:
- return "UNAVAILABLE";
- case GRPC_STATUS_DATA_LOSS:
- return "DATA_LOSS";
- default:
- return "UNKNOWN";
- }
-}
diff --git a/src/core/ext/filters/client_channel/status_util.h b/src/core/ext/filters/client_channel/status_util.h
deleted file mode 100644
index e018709730..0000000000
--- a/src/core/ext/filters/client_channel/status_util.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H
-
-#include <grpc/support/port_platform.h>
-
-#include <grpc/status.h>
-
-#include <stdbool.h>
-#include <string.h>
-
-/// If \a status_str is a valid status string, sets \a status to the
-/// corresponding status value and returns true.
-bool grpc_status_code_from_string(const char* status_str,
- grpc_status_code* status);
-
-/// Returns the string form of \a status, or "UNKNOWN" if invalid.
-const char* grpc_status_code_to_string(grpc_status_code status);
-
-namespace grpc_core {
-namespace internal {
-
-/// A set of grpc_status_code values.
-class StatusCodeSet {
- public:
- bool Empty() const { return status_code_mask_ == 0; }
-
- void Add(grpc_status_code status) { status_code_mask_ |= (1 << status); }
-
- bool Contains(grpc_status_code status) const {
- return status_code_mask_ & (1 << status);
- }
-
- private:
- int status_code_mask_ = 0; // A bitfield of status codes in the set.
-};
-
-} // namespace internal
-} // namespace grpc_core
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index cae7cc35e3..d7815fb7e1 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -40,6 +40,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
index 1966da932b..d749f23308 100644
--- a/src/core/ext/filters/client_channel/uri_parser.h
+++ b/src/core/ext/filters/client_channel/uri_parser.h
@@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
char* scheme;