diff options
Diffstat (limited to 'src/core/ext/filters/client_channel')
33 files changed, 1854 insertions, 1599 deletions
diff --git a/src/core/ext/filters/client_channel/OWNERS b/src/core/ext/filters/client_channel/OWNERS index 8f5e92808e..c8760d947b 100644 --- a/src/core/ext/filters/client_channel/OWNERS +++ b/src/core/ext/filters/client_channel/OWNERS @@ -1,4 +1,4 @@ set noparent @markdroth @dgquintas -@a11r +@AspirinSJL diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h index 7285b9b93e..8f132f968c 100644 --- a/src/core/ext/filters/client_channel/backup_poller.h +++ b/src/core/ext/filters/client_channel/backup_poller.h @@ -23,7 +23,6 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/iomgr/exec_ctx.h" /* Start polling \a interested_parties periodically in the timer thread */ void grpc_client_channel_start_backup_polling( diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 90b93fbe23..80a647fa94 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -38,12 +38,12 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" -#include "src/core/ext/filters/client_channel/status_util.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" @@ -63,6 +63,7 @@ #include "src/core/lib/transport/status_metadata.h" using grpc_core::internal::ClientChannelMethodParams; +using grpc_core::internal::ServerRetryThrottleData; /* Client channel implementation */ @@ -99,7 +100,7 @@ typedef struct client_channel_channel_data { /** currently active load balancer */ grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; /** retry throttle data */ - grpc_server_retry_throttle_data* retry_throttle_data; + grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -173,7 +174,7 @@ static void set_channel_connectivity_state_locked(channel_data* chand, } } if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, + gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, grpc_connectivity_state_name(state)); } grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); @@ -185,7 +186,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy.get()) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, + gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); } set_channel_connectivity_state_locked(w->chand, w->state, @@ -214,7 +215,7 @@ static void watch_lb_policy_locked(channel_data* chand, static void start_resolving_locked(channel_data* chand) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); + gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); } GPR_ASSERT(!chand->started_resolving); chand->started_resolving = true; @@ -225,7 +226,7 @@ static void start_resolving_locked(channel_data* chand) { typedef struct { char* server_name; - grpc_server_retry_throttle_data* retry_throttle_data; + grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; } service_config_parsing_state; static void parse_retry_throttle_params( @@ -278,7 +279,7 @@ static void parse_retry_throttle_params( } } parsing_state->retry_throttle_data = - grpc_retry_throttle_map_get_data_for_server( + grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( parsing_state->server_name, max_milli_tokens, milli_token_ratio); } } @@ -296,18 +297,23 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { return; } if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand); + gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); } chand->resolver->RequestReresolutionLocked(); // Give back the closure to the LB policy. chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } +// TODO(roth): The logic in this function is very hard to follow. We +// should refactor this so that it's easier to understand, perhaps as +// part of changing the resolver API to more clearly differentiate +// between transient failures and shutdown. static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(arg); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, - grpc_error_string(error)); + gpr_log(GPR_INFO, + "chand=%p: got resolver result: resolver_result=%p error=%s", chand, + chand->resolver_result, grpc_error_string(error)); } // Extract the following fields from the resolver result, if non-nullptr. bool lb_policy_updated = false; @@ -316,7 +322,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { bool lb_policy_name_changed = false; grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy; char* service_config_json = nullptr; - grpc_server_retry_throttle_data* retry_throttle_data = nullptr; + grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; if (chand->resolver_result != nullptr) { if (chand->resolver != nullptr) { @@ -416,18 +422,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { service_config->ParseGlobalParams(parse_retry_throttle_params, &parsing_state); grpc_uri_destroy(uri); - retry_throttle_data = parsing_state.retry_throttle_data; + retry_throttle_data = std::move(parsing_state.retry_throttle_data); } method_params_table = service_config->CreateMethodConfigTable( ClientChannelMethodParams::CreateFromJson); } } } - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; } if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " "service_config=\"%s\"", chand, lb_policy_name_dup, @@ -449,10 +453,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } gpr_mu_unlock(&chand->info_mu); // Swap out the retry throttle data. - if (chand->retry_throttle_data != nullptr) { - grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); - } - chand->retry_throttle_data = retry_throttle_data; + chand->retry_throttle_data = std::move(retry_throttle_data); // Swap out the method params table. chand->method_params_table = std::move(method_params_table); // If we have a new LB policy or are shutting down (in which case @@ -465,7 +466,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->resolver == nullptr) { if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, + gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand, chand->lb_policy.get()); } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), @@ -479,11 +480,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { // error or shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); + gpr_log(GPR_INFO, "chand=%p: shutting down", chand); } if (chand->resolver != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); + gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand); } chand->resolver.reset(); } @@ -497,13 +498,15 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { "Channel disconnected", &error, 1)); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (lb_policy_created) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); + gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand); } GRPC_ERROR_UNREF(state_error); state = chand->lb_policy->CheckConnectivityLocked(&state_error); @@ -515,11 +518,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->exit_idle_when_lb_policy_arrives = false; } watch_lb_policy_locked(chand, chand->lb_policy.get(), state); + } else if (chand->resolver_result == nullptr) { + // Transient failure. + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (!lb_policy_updated) { set_channel_connectivity_state_locked( chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); } + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; chand->resolver->NextLocked(&chand->resolver_result, &chand->on_resolver_result_changed); GRPC_ERROR_UNREF(state_error); @@ -715,12 +723,8 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); - if (chand->retry_throttle_data != nullptr) { - grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); - } - if (chand->method_params_table != nullptr) { - chand->method_params_table.reset(); - } + chand->retry_throttle_data.reset(); + chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -798,7 +802,8 @@ typedef struct { grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. - grpc_caching_byte_stream send_message; + grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> + send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; @@ -808,7 +813,7 @@ typedef struct { bool trailing_metadata_available; // For intercepting recv_message. grpc_closure recv_message_ready; - grpc_byte_stream* recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; @@ -837,10 +842,11 @@ typedef struct { bool completed_recv_trailing_metadata : 1; // State for callback processing. bool retry_dispatched : 1; - bool recv_initial_metadata_ready_deferred : 1; - bool recv_message_ready_deferred : 1; + subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; grpc_error* recv_initial_metadata_error; + subchannel_batch_data* recv_message_ready_deferred_batch; grpc_error* recv_message_error; + subchannel_batch_data* recv_trailing_metadata_internal_batch; } subchannel_call_retry_state; // Pending batches stored in call data. @@ -872,7 +878,7 @@ typedef struct client_channel_call_data { grpc_call_stack* owning_call; grpc_call_combiner* call_combiner; - grpc_server_retry_throttle_data* retry_throttle_data; + grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; grpc_subchannel_call* subchannel_call; @@ -914,12 +920,14 @@ typedef struct client_channel_call_data { gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream - // with a grpc_caching_byte_stream that caches the slices to a - // local buffer for use in retries. + // with a CachingByteStream that caches the slices to a local buffer for + // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages; + grpc_core::ManualConstructor< + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> + send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -964,11 +972,12 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } // Set up cache for send_message ops. if (batch->send_message) { - grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc( - calld->arena, sizeof(grpc_byte_stream_cache)); - grpc_byte_stream_cache_init(cache, - batch->payload->send_message.send_message); - calld->send_messages.push_back(cache); + grpc_core::ByteStreamCache* cache = + static_cast<grpc_core::ByteStreamCache*>( + gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); + new (cache) grpc_core::ByteStreamCache( + std::move(batch->payload->send_message.send_message)); + calld->send_messages->push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { @@ -986,6 +995,39 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } } +// Frees cached send_initial_metadata. +static void free_cached_send_initial_metadata(channel_data* chand, + call_data* calld) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, + calld); + } + grpc_metadata_batch_destroy(&calld->send_initial_metadata); +} + +// Frees cached send_message at index idx. +static void free_cached_send_message(channel_data* chand, call_data* calld, + size_t idx) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", + chand, calld, idx); + } + (*calld->send_messages)[idx]->Destroy(); +} + +// Frees cached send_trailing_metadata. +static void free_cached_send_trailing_metadata(channel_data* chand, + call_data* calld) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: destroying calld->send_trailing_metadata", + chand, calld); + } + grpc_metadata_batch_destroy(&calld->send_trailing_metadata); +} + // Frees cached send ops that have already been completed after // committing the call. static void free_cached_send_op_data_after_commit( @@ -993,19 +1035,13 @@ static void free_cached_send_op_data_after_commit( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (retry_state->completed_send_initial_metadata) { - grpc_metadata_batch_destroy(&calld->send_initial_metadata); + free_cached_send_initial_metadata(chand, calld); } for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR - "]", - chand, calld, i); - } - grpc_byte_stream_cache_destroy(calld->send_messages[i]); + free_cached_send_message(chand, calld, i); } if (retry_state->completed_send_trailing_metadata) { - grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + free_cached_send_trailing_metadata(chand, calld); } } @@ -1017,20 +1053,14 @@ static void free_cached_send_op_data_for_completed_batch( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (batch_data->batch.send_initial_metadata) { - grpc_metadata_batch_destroy(&calld->send_initial_metadata); + free_cached_send_initial_metadata(chand, calld); } if (batch_data->batch.send_message) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR - "]", - chand, calld, retry_state->completed_send_message_count - 1); - } - grpc_byte_stream_cache_destroy( - calld->send_messages[retry_state->completed_send_message_count - 1]); + free_cached_send_message(chand, calld, + retry_state->completed_send_message_count - 1); } if (batch_data->batch.send_trailing_metadata) { - grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + free_cached_send_trailing_metadata(chand, calld); } } @@ -1058,7 +1088,7 @@ static void pending_batches_add(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); const size_t idx = get_batch_index(batch); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, calld, idx); } @@ -1079,14 +1109,14 @@ static void pending_batches_add(grpc_call_element* elem, if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += - batch->payload->send_message.send_message->length; + batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; } if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded retry buffer size, committing", chand, calld); } @@ -1101,7 +1131,7 @@ static void pending_batches_add(grpc_call_element* elem, // retries are disabled so that we don't bother with retry overhead. if (calld->num_attempts_completed == 0) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: disabling retries before first attempt", chand, calld); } @@ -1148,7 +1178,7 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } @@ -1210,7 +1240,7 @@ static void pending_batches_resume(grpc_call_element* elem) { for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); @@ -1255,7 +1285,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, (!batch->recv_message || batch->payload->recv_message.recv_message_ready == nullptr)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: clearing pending batch", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); } pending_batch_clear(calld, pending); @@ -1274,7 +1304,8 @@ static bool pending_batch_is_completed( return false; } if (pending->batch->send_message && - retry_state->completed_send_message_count < calld->send_messages.size()) { + retry_state->completed_send_message_count < + calld->send_messages->size()) { return false; } if (pending->batch->send_trailing_metadata && @@ -1309,7 +1340,7 @@ static bool pending_batch_is_unstarted( return true; } if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages.size()) { + retry_state->started_send_message_count < calld->send_messages->size()) { return true; } if (pending->batch->send_trailing_metadata && @@ -1344,7 +1375,7 @@ static void retry_commit(grpc_call_element* elem, if (calld->retry_committed) return; calld->retry_committed = true; if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: committing retries", chand, calld); + gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); } if (retry_state != nullptr) { free_cached_send_op_data_after_commit(elem, retry_state); @@ -1389,7 +1420,7 @@ static void do_retry(grpc_call_element* elem, next_attempt_time = calld->retry_backoff->NextAttemptTime(); } if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: retrying failed call in %" PRIuPTR " ms", chand, calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); } @@ -1423,7 +1454,7 @@ static bool maybe_retry(grpc_call_element* elem, batch_data->subchannel_call)); if (retry_state->retry_dispatched) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: retry already dispatched", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, calld); } return true; @@ -1431,16 +1462,18 @@ static bool maybe_retry(grpc_call_element* elem, } // Check status. if (status == GRPC_STATUS_OK) { - grpc_server_retry_throttle_data_record_success(calld->retry_throttle_data); + if (calld->retry_throttle_data != nullptr) { + calld->retry_throttle_data->RecordSuccess(); + } if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: call succeeded", chand, calld); + gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); } return false; } // Status is not OK. Check whether the status is retryable. if (!retry_policy->retryable_status_codes.Contains(status)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: status %s not configured as retryable", chand, calld, grpc_status_code_to_string(status)); } @@ -1453,17 +1486,17 @@ static bool maybe_retry(grpc_call_element* elem, // things like failures due to malformed requests (INVALID_ARGUMENT). // Conversely, it's important for this to come before the remaining // checks, so that we don't fail to record failures due to other factors. - if (!grpc_server_retry_throttle_data_record_failure( - calld->retry_throttle_data)) { + if (calld->retry_throttle_data != nullptr && + !calld->retry_throttle_data->RecordFailure()) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries throttled", chand, calld); + gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); } return false; } // Check whether the call is committed. if (calld->retry_committed) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries already committed", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, calld); } return false; @@ -1472,7 +1505,7 @@ static bool maybe_retry(grpc_call_element* elem, ++calld->num_attempts_completed; if (calld->num_attempts_completed >= retry_policy->max_attempts) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: exceeded %d retry attempts", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, calld, retry_policy->max_attempts); } return false; @@ -1480,7 +1513,7 @@ static bool maybe_retry(grpc_call_element* elem, // If the call was cancelled from the surface, don't retry. if (calld->cancel_error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: call cancelled from surface, not retrying", chand, calld); } @@ -1493,16 +1526,15 @@ static bool maybe_retry(grpc_call_element* elem, uint32_t ms; if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: not retrying due to server push-back", chand, calld); } return false; } else { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: server push-back: retry in %u ms", chand, - calld, ms); + gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", + chand, calld, ms); } server_pushback_ms = (grpc_millis)ms; } @@ -1575,7 +1607,7 @@ static void invoke_recv_initial_metadata_callback(void* arg, batch->payload->recv_initial_metadata.recv_initial_metadata_ready != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: invoking recv_initial_metadata_ready for " "pending batch at index %" PRIuPTR, chand, calld, i); @@ -1611,7 +1643,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", chand, calld, grpc_error_string(error)); } @@ -1626,12 +1658,12 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_initial_metadata_ready " "(Trailers-Only)", chand, calld); } - retry_state->recv_initial_metadata_ready_deferred = true; + retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it @@ -1668,7 +1700,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { if (batch != nullptr && batch->recv_message && batch->payload->recv_message.recv_message_ready != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: invoking recv_message_ready for " "pending batch at index %" PRIuPTR, chand, calld, i); @@ -1680,7 +1712,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = - batch_data->recv_message; + std::move(batch_data->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -1701,7 +1733,7 @@ static void recv_message_ready(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: got recv_message_ready, error=%s", + gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = @@ -1715,12 +1747,12 @@ static void recv_message_ready(void* arg, grpc_error* error) { if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_message_ready (nullptr " "message and recv_trailing_metadata pending)", chand, calld); } - retry_state->recv_message_ready_deferred = true; + retry_state->recv_message_ready_deferred_batch = batch_data; retry_state->recv_message_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it @@ -1739,6 +1771,59 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // +// list of closures to execute in call combiner +// + +// Represents a closure that needs to run in the call combiner as part of +// starting or completing a batch. +typedef struct { + grpc_closure* closure; + grpc_error* error; + const char* reason; + bool free_reason = false; +} closure_to_execute; + +static void execute_closures_in_call_combiner(grpc_call_element* elem, + const char* caller, + closure_to_execute* closures, + size_t num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + // Note that the call combiner will be yielded for each closure that + // we schedule. We're already running in the call combiner, so one of + // the closures can be scheduled directly, but the others will + // have to re-enter the call combiner. + if (num_closures > 0) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand, + calld, caller, closures[0].reason); + } + GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); + if (closures[0].free_reason) { + gpr_free(const_cast<char*>(closures[0].reason)); + } + for (size_t i = 1; i < num_closures; ++i) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s starting closure in call combiner: %s", + chand, calld, caller, closures[i].reason); + } + GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, + closures[i].error, closures[i].reason); + if (closures[i].free_reason) { + gpr_free(const_cast<char*>(closures[i].reason)); + } + } + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand, + calld, caller); + } + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); + } +} + +// // on_complete callback handling // @@ -1766,36 +1851,35 @@ static void update_retry_state_for_completed_batch( } } -// Represents a closure that needs to run as a result of a completed batch. -typedef struct { - grpc_closure* closure; - grpc_error* error; - const char* reason; -} closure_to_execute; - // Adds any necessary closures for deferred recv_initial_metadata and // recv_message callbacks to closures, updating *num_closures as needed. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, closure_to_execute* closures, size_t* num_closures) { - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, batch_data, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_initial_metadata_error; - closure->reason = "resuming recv_initial_metadata_ready"; - } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, - invoke_recv_message_callback, - batch_data, grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_message_error; - closure->reason = "resuming recv_message_ready"; + if (batch_data->batch.recv_trailing_metadata) { + // Add closure for deferred recv_initial_metadata_ready. + if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_initial_metadata_error; + closure->reason = "resuming recv_initial_metadata_ready"; + retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; + } + // Add closure for deferred recv_message_ready. + if (retry_state->recv_message_ready_deferred_batch != nullptr) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_message_ready, invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_message_error; + closure->reason = "resuming recv_message_ready"; + retry_state->recv_message_ready_deferred_batch = nullptr; + } } } @@ -1809,7 +1893,7 @@ static void add_closures_for_replay_or_pending_send_ops( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = - retry_state->started_send_message_count < calld->send_messages.size(); + retry_state->started_send_message_count < calld->send_messages->size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; @@ -1827,7 +1911,7 @@ static void add_closures_for_replay_or_pending_send_ops( } if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } @@ -1852,7 +1936,7 @@ static void add_closures_for_completed_pending_batches( pending_batch* pending = &calld->pending_batches[i]; if (pending_batch_is_completed(pending, calld, retry_state)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, chand, calld, i); } @@ -1885,7 +1969,7 @@ static void add_closures_to_fail_unstarted_pending_batches( pending_batch* pending = &calld->pending_batches[i]; if (pending_batch_is_unstarted(pending, calld, retry_state)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: failing unstarted pending batch at index " "%" PRIuPTR, chand, calld, i); @@ -1929,7 +2013,7 @@ static void on_complete(void* arg, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); - gpr_log(GPR_DEBUG, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", + gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", chand, calld, grpc_error_string(error), batch_str); gpr_free(batch_str); } @@ -1940,11 +2024,13 @@ static void on_complete(void* arg, grpc_error* error) { // If we have previously completed recv_trailing_metadata, then the // call is finished. bool call_finished = retry_state->completed_recv_trailing_metadata; + // Record whether we were already committed before receiving this callback. + const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. update_retry_state_for_completed_batch(batch_data, retry_state); if (call_finished) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: call already finished", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand, calld); } } else { @@ -1968,35 +2054,39 @@ static void on_complete(void* arg, grpc_error* error) { if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } - } else if (retry_state->completed_recv_trailing_metadata) { - call_finished = true; - } - if (call_finished && grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); } - // If the call is finished, check if we should retry. - if (call_finished && - maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + // If the call just finished, check if we should retry. + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred) { + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred_batch != + nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); + return; } - batch_data_unref(batch_data); - return; + // Not retrying, so commit the call. + retry_commit(elem, retry_state); } } - // If the call is finished or retries are committed, free cached data for - // send ops that we've just completed. - if (call_finished || calld->retry_committed) { + // If we were already committed before receiving this callback, free + // cached data for send ops that we've just completed. (If the call has + // just now finished, the call to retry_commit() above will have freed all + // cached send ops, so we don't need to do it here.) + if (previously_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } // Call not being retried. @@ -2031,20 +2121,8 @@ static void on_complete(void* arg, grpc_error* error) { // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. - // Note that the call combiner will be yielded for each closure that - // we schedule. We're already running in the call combiner, so one of - // the closures can be scheduled directly, but the others will - // have to re-enter the call combiner. - if (num_closures > 0) { - GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); - for (size_t i = 1; i < num_closures; ++i) { - GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, - closures[i].error, closures[i].reason); - } - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "no closures to run for on_complete"); - } + execute_closures_in_call_combiner(elem, "on_complete", closures, + num_closures); } // @@ -2061,6 +2139,31 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_subchannel_call_process_op(subchannel_call, batch); } +// Adds a closure to closures that will execute batch in the call combiner. +static void add_closure_for_subchannel_batch( + call_data* calld, grpc_transport_stream_op_batch* batch, + closure_to_execute* closures, size_t* num_closures) { + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch->handler_private.closure; + closure->error = GRPC_ERROR_NONE; + // If the tracer is enabled, we log a more detailed message, which + // requires dynamic allocation. This will be freed in + // start_retriable_subchannel_batches(). + if (grpc_client_channel_trace.enabled()) { + char* batch_str = grpc_transport_stream_op_batch_string(batch); + gpr_asprintf(const_cast<char**>(&closure->reason), + "starting batch in call combiner: %s", batch_str); + gpr_free(batch_str); + closure->free_reason = true; + } else { + closure->reason = "start_subchannel_batch"; + } +} + // Adds retriable send_initial_metadata op to batch_data. static void add_retriable_send_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, @@ -2120,17 +2223,17 @@ static void add_retriable_send_message_op( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } - grpc_byte_stream_cache* cache = - calld->send_messages[retry_state->started_send_message_count]; + grpc_core::ByteStreamCache* cache = + (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - grpc_caching_byte_stream_init(&batch_data->send_message, cache); + batch_data->send_message.Init(cache); batch_data->batch.send_message = true; - batch_data->batch.payload->send_message.send_message = - &batch_data->send_message.base; + batch_data->batch.payload->send_message.send_message.reset( + batch_data->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. @@ -2207,7 +2310,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: call failed but recv_trailing_metadata not " "started; starting it internally", chand, calld); @@ -2216,8 +2319,12 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); - subchannel_batch_data* batch_data = batch_data_create(elem, 1); + // Create batch_data with 2 refs, since this batch will be unreffed twice: + // once when the subchannel batch returns, and again when we actually get + // a recv_trailing_metadata op from the surface. + subchannel_batch_data* batch_data = batch_data_create(elem, 2); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); + retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); } @@ -2235,7 +2342,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( !retry_state->started_send_initial_metadata && !calld->pending_send_initial_metadata) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_initial_metadata op", chand, calld); @@ -2246,12 +2353,12 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( } // send_message. // Note that we can only have one send_message op in flight at a time. - if (retry_state->started_send_message_count < calld->send_messages.size() && + if (retry_state->started_send_message_count < calld->send_messages->size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_message op", chand, calld); @@ -2266,11 +2373,11 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && - retry_state->started_send_message_count == calld->send_messages.size() && + retry_state->started_send_message_count == calld->send_messages->size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_trailing_metadata op", chand, calld); @@ -2288,7 +2395,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_transport_stream_op_batch** batches, size_t* num_batches) { + closure_to_execute* closures, size_t* num_closures) { call_data* calld = static_cast<call_data*>(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2317,7 +2424,7 @@ static void add_subchannel_batches_for_pending_batches( // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < - calld->send_messages.size() || + calld->send_messages->size() || retry_state->started_send_trailing_metadata)) { continue; } @@ -2331,13 +2438,37 @@ static void add_subchannel_batches_for_pending_batches( } if (batch->recv_trailing_metadata && retry_state->started_recv_trailing_metadata) { + // If we previously completed a recv_trailing_metadata op + // initiated by start_internal_recv_trailing_metadata(), use the + // result of that instead of trying to re-start this op. + if (retry_state->recv_trailing_metadata_internal_batch != nullptr) { + // If the batch completed, then trigger the completion callback + // directly, so that we return the previously returned results to + // the application. Otherwise, just unref the internally + // started subchannel batch, since we'll propagate the + // completion when it completes. + if (retry_state->completed_recv_trailing_metadata) { + subchannel_batch_data* batch_data = + retry_state->recv_trailing_metadata_internal_batch; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch_data->on_complete; + // Batches containing recv_trailing_metadata always succeed. + closure->error = GRPC_ERROR_NONE; + closure->reason = + "re-executing on_complete for recv_trailing_metadata " + "to propagate internally triggered result"; + } else { + batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); + } + retry_state->recv_trailing_metadata_internal_batch = nullptr; + } continue; } // If we're not retrying, just send the batch as-is. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - batches[(*num_batches)++] = batch; + add_closure_for_subchannel_batch(calld, batch, closures, num_closures); pending_batch_clear(calld, pending); continue; } @@ -2374,7 +2505,8 @@ static void add_subchannel_batches_for_pending_batches( GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - batches[(*num_batches)++] = &batch_data->batch; + add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, + num_closures); } } @@ -2385,69 +2517,36 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: constructing retriable batches", + gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", chand, calld); } subchannel_call_retry_state* retry_state = static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); + // Construct list of closures to execute, one for each pending batch. // We can start up to 6 batches. - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_closures = 0; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - batches[num_batches++] = &replay_batch_data->batch; + add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, + &num_closures); } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, batches, - &num_batches); + add_subchannel_batches_for_pending_batches(elem, retry_state, closures, + &num_closures); // Start batches on subchannel call. - // Note that the call combiner will be yielded for each batch that we - // send down. We're already running in the call combiner, so one of - // the batches can be started directly, but the others will have to - // re-enter the call combiner. if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, num_batches, calld->subchannel_call); - } - if (num_batches == 0) { - // This should be fairly rare, but it can happen when (e.g.) an - // attempt completes before it has finished replaying all - // previously sent messages. - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "no retriable subchannel batches to start"); - } else { - for (size_t i = 1; i < num_batches; ++i) { - if (grpc_client_channel_trace.enabled()) { - char* batch_str = grpc_transport_stream_op_batch_string(batches[i]); - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: starting batch in call combiner: %s", chand, - calld, batch_str); - gpr_free(batch_str); - } - batches[i]->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure, - start_batch_in_call_combiner, batches[i], - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batches[i]->handler_private.closure, - GRPC_ERROR_NONE, "start_subchannel_batch"); - } - if (grpc_client_channel_trace.enabled()) { - char* batch_str = grpc_transport_stream_op_batch_string(batches[0]); - gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld, - batch_str); - gpr_free(batch_str); - } - // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); + chand, calld, num_closures, calld->subchannel_call); } + execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", + closures, num_closures); } // @@ -2472,7 +2571,7 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); } if (new_error != GRPC_ERROR_NONE) { @@ -2513,7 +2612,7 @@ static void pick_done(void* arg, grpc_error* error) { : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: failed to create subchannel: error=%s", chand, calld, grpc_error_string(new_error)); } @@ -2557,7 +2656,7 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { // the one we started it on. However, this will just be a no-op. if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", chand, calld, chand->lb_policy.get()); } chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); @@ -2572,8 +2671,8 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", - chand, calld); + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand, + calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); @@ -2585,12 +2684,11 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, calld); } if (chand->retry_throttle_data != nullptr) { - calld->retry_throttle_data = - grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + calld->retry_throttle_data = chand->retry_throttle_data->Ref(); } if (chand->method_params_table != nullptr) { calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( @@ -2624,8 +2722,8 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy.get()); + gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, + calld, chand->lb_policy.get()); } // Only get service config data on the first attempt. if (calld->num_attempts_completed == 0) { @@ -2672,7 +2770,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { if (pick_done) { // Pick completed synchronously. if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", chand, calld); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); @@ -2716,7 +2814,7 @@ static void pick_after_resolver_result_cancel_locked(void* arg, channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick waiting for resolver result", chand, calld); } @@ -2736,7 +2834,7 @@ static void pick_after_resolver_result_done_locked(void* arg, if (args->finished) { /* cancelled, do nothing */ if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "call cancelled before resolver result"); + gpr_log(GPR_INFO, "call cancelled before resolver result"); } gpr_free(args); return; @@ -2747,13 +2845,51 @@ static void pick_after_resolver_result_done_locked(void* arg, call_data* calld = static_cast<call_data*>(elem->call_data); if (error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", chand, calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (chand->lb_policy != nullptr) { + } else if (chand->resolver == nullptr) { + // Shutting down. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (chand->lb_policy == nullptr) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=true; trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(elem); + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=false; failing", + chand, calld); + } + async_pick_done_locked( + elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } else { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); } if (pick_callback_start_locked(elem)) { @@ -2765,37 +2901,13 @@ static void pick_after_resolver_result_done_locked(void* arg, async_pick_done_locked(elem, GRPC_ERROR_NONE); } } - // TODO(roth): It should be impossible for chand->lb_policy to be nullptr - // here, so the rest of this code should never actually be executed. - // However, we have reports of a crash on iOS that triggers this case, - // so we are temporarily adding this to restore branches that were - // removed in https://github.com/grpc/grpc/pull/12297. Need to figure - // out what is actually causing this to occur and then figure out the - // right way to deal with it. - else if (chand->resolver != nullptr) { - // No LB policy, so try again. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: resolver returned but no LB policy, " - "trying again", - chand, calld); - } - pick_after_resolver_result_start_locked(elem); - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - async_pick_done_locked( - elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } } static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } @@ -2862,7 +2974,7 @@ static void cc_start_transport_stream_op_batch( // If we've previously been cancelled, immediately fail any new batches. if (calld->cancel_error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", chand, calld, grpc_error_string(calld->cancel_error)); } // Note: This will release the call combiner. @@ -2881,7 +2993,7 @@ static void cc_start_transport_stream_op_batch( calld->cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, calld, grpc_error_string(calld->cancel_error)); } // If we do not have a subchannel call (i.e., a pick has not yet @@ -2907,7 +3019,7 @@ static void cc_start_transport_stream_op_batch( // streaming calls). if (calld->subchannel_call != nullptr) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, calld, calld->subchannel_call); } @@ -2919,7 +3031,7 @@ static void cc_start_transport_stream_op_batch( // combiner to start a pick. if (batch->send_initial_metadata) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner", + gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", chand, calld); } GRPC_CLOSURE_SCHED( @@ -2929,7 +3041,7 @@ static void cc_start_transport_stream_op_batch( } else { // For all other batches, release the call combiner. if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "chand=%p calld=%p: saved batch, yeilding call combiner", chand, calld); } @@ -2955,6 +3067,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, calld->deadline); } calld->enable_retries = chand->enable_retries; + calld->send_messages.Init(); return GRPC_ERROR_NONE; } @@ -2968,6 +3081,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, grpc_deadline_state_destroy(elem); } grpc_slice_unref_internal(calld->path); + calld->retry_throttle_data.reset(); calld->method_params.reset(); GRPC_ERROR_UNREF(calld->cancel_error); if (calld->subchannel_call != nullptr) { @@ -2989,6 +3103,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, calld->pick.subchannel_call_context[i].value); } } + calld->send_messages.Destroy(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } @@ -3137,6 +3252,8 @@ static void watch_connectivity_state_locked(void* arg, external_connectivity_watcher* found = nullptr; if (w->state != nullptr) { external_connectivity_watcher_list_append(w->chand, w); + // An assumption is being made that the closure is scheduled on the exec ctx + // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc index 3c3a97532f..8385852d1b 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.cc +++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc @@ -39,38 +39,13 @@ static bool append_filter(grpc_channel_stack_builder* builder, void* arg) { builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr); } -static bool set_default_host_if_unset(grpc_channel_stack_builder* builder, - void* unused) { - const grpc_channel_args* args = - grpc_channel_stack_builder_get_channel_arguments(builder); - for (size_t i = 0; i < args->num_args; i++) { - if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY) || - 0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) { - return true; - } - } - grpc_core::UniquePtr<char> default_authority = - grpc_core::ResolverRegistry::GetDefaultAuthority( - grpc_channel_stack_builder_get_target(builder)); - if (default_authority.get() != nullptr) { - grpc_arg arg = grpc_channel_arg_string_create( - (char*)GRPC_ARG_DEFAULT_AUTHORITY, default_authority.get()); - grpc_channel_args* new_args = grpc_channel_args_copy_and_add(args, &arg, 1); - grpc_channel_stack_builder_set_channel_arguments(builder, new_args); - grpc_channel_args_destroy(new_args); - } - return true; -} - void grpc_client_channel_init(void) { grpc_core::LoadBalancingPolicyRegistry::Builder::InitRegistry(); grpc_core::ResolverRegistry::Builder::InitRegistry(); - grpc_retry_throttle_map_init(); + grpc_core::internal::ServerRetryThrottleMap::Init(); grpc_proxy_mapper_registry_init(); grpc_register_http_proxy_mapper(); grpc_subchannel_index_init(); - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN, - set_default_host_if_unset, nullptr); grpc_channel_init_register_stage( GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void*)&grpc_client_channel_filter); @@ -81,7 +56,7 @@ void grpc_client_channel_shutdown(void) { grpc_subchannel_index_shutdown(); grpc_channel_init_shutdown(); grpc_proxy_mapper_registry_shutdown(); - grpc_retry_throttle_map_shutdown(); + grpc_core::internal::ServerRetryThrottleMap::Shutdown(); grpc_core::ResolverRegistry::Builder::ShutdownRegistry(); grpc_core::LoadBalancingPolicyRegistry::Builder::ShutdownRegistry(); } diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index fb29fa788d..4e8b8b71db 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -326,7 +326,7 @@ static void http_connect_handshaker_do_handshake( static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, - http_connect_handshaker_do_handshake}; + http_connect_handshaker_do_handshake, "http_connect"}; static grpc_handshaker* grpc_http_connect_handshaker_create() { http_connect_handshaker* handshaker = diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index fa63dd75b5..e065f45639 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -44,13 +44,13 @@ void LoadBalancingPolicy::TryReresolutionLocked( GRPC_CLOSURE_SCHED(request_reresolution_, error); request_reresolution_ = nullptr; if (grpc_lb_trace->enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "%s %p: scheduling re-resolution closure with error=%s.", grpc_lb_trace->name(), this, grpc_error_string(error)); } } else { if (grpc_lb_trace->enabled()) { - gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.", + gpr_log(GPR_INFO, "%s %p: no available re-resolution closure.", grpc_lb_trace->name(), this); } } diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index c3e43e5ef6..454e00a690 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -162,6 +162,10 @@ class LoadBalancingPolicy GRPC_ABSTRACT_BASE_CLASS protected: + // So Delete() can access our protected dtor. + template <typename T> + friend void Delete(T*); + explicit LoadBalancingPolicy(const Args& args); virtual ~LoadBalancingPolicy(); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index cb39e4224e..70a91b2567 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -61,6 +61,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" #include <inttypes.h> #include <limits.h> @@ -75,6 +76,7 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" @@ -188,6 +190,10 @@ class GrpcLb : public LoadBalancingPolicy { bool seen_initial_response() const { return seen_initial_response_; } private: + // So Delete() can access our private dtor. + template <typename T> + friend void grpc_core::Delete(T*); + ~BalancerCallState(); GrpcLb* grpclb_policy() const { @@ -417,20 +423,20 @@ void ParseServer(const grpc_grpclb_server* server, grpc_resolved_address* addr) { memset(addr, 0, sizeof(*addr)); if (server->drop) return; - const uint16_t netorder_port = htons((uint16_t)server->port); + const uint16_t netorder_port = grpc_htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address* ip = &server->ip_address; if (ip->size == 4) { - addr->len = sizeof(struct sockaddr_in); - struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr; - addr4->sin_family = AF_INET; + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); + grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); + addr4->sin_family = GRPC_AF_INET; memcpy(&addr4->sin_addr, ip->bytes, ip->size); addr4->sin_port = netorder_port; } else if (ip->size == 16) { - addr->len = sizeof(struct sockaddr_in6); - struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr; - addr6->sin6_family = AF_INET6; + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); + grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; + addr6->sin6_family = GRPC_AF_INET6; memcpy(&addr6->sin6_addr, ip->bytes, ip->size); addr6->sin6_port = netorder_port; } @@ -504,9 +510,7 @@ GrpcLb::BalancerCallState::BalancerCallState( // the polling entities from client_channel. GPR_ASSERT(grpclb_policy()->server_name_ != nullptr); GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0'); - grpc_slice host = - grpc_slice_from_copied_string(grpclb_policy()->server_name_); - grpc_millis deadline = + const grpc_millis deadline = grpclb_policy()->lb_call_timeout_ms_ == 0 ? GRPC_MILLIS_INF_FUTURE : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_; @@ -514,8 +518,7 @@ GrpcLb::BalancerCallState::BalancerCallState( grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, grpclb_policy_->interested_parties(), GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, - &host, deadline, nullptr); - grpc_slice_unref_internal(host); + nullptr, deadline, nullptr); // Init the LB call request payload. grpc_grpclb_request* request = grpc_grpclb_request_create(grpclb_policy()->server_name_); @@ -982,6 +985,14 @@ grpc_channel_args* BuildBalancerChannelArgs( // with the one from the grpclb policy, used to propagate updates to // the LB channel. GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, + // The LB channel should use the authority indicated by the target + // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args), + // as opposed to the authority from the parent channel. + GRPC_ARG_DEFAULT_AUTHORITY, + // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be + // treated as a stand-alone channel and not inherit this argument from the + // args of the parent channel. + GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, }; // Channel args to add. const grpc_arg args_to_add[] = { @@ -993,6 +1004,9 @@ grpc_channel_args* BuildBalancerChannelArgs( // address updates into the LB channel. grpc_core::FakeResolverResponseGenerator::MakeChannelArg( response_generator), + // A channel arg indicating the target is a grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1), }; // Construct channel args. grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( @@ -1237,7 +1251,7 @@ bool GrpcLb::PickLocked(PickState* pick) { } } else { // rr_policy_ == NULL if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", this); } @@ -1403,14 +1417,13 @@ void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { void GrpcLb::StartBalancerCallRetryTimerLocked() { grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this); + gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); if (timeout > 0) { - gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this, - timeout); + gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + this, timeout); } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.", this); } } @@ -1689,9 +1702,11 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { grpc_lb_addresses* addresses; + bool is_backend_from_grpclb_load_balancer = false; if (serverlist_ != nullptr) { GPR_ASSERT(serverlist_->num_servers > 0); addresses = ProcessServerlist(serverlist_); + is_backend_from_grpclb_load_balancer = true; } else { // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't // received any serverlist from the balancer, we use the fallback backends @@ -1705,9 +1720,18 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); + const grpc_arg args_to_add[] = { + grpc_lb_addresses_create_channel_arg(addresses), + // A channel arg indicating if the target is a backend inferred from a + // grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>( + GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), + is_backend_from_grpclb_load_balancer), + }; grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( - args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); + args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); grpc_lb_addresses_destroy(addresses); return args; } @@ -1718,7 +1742,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { GPR_ASSERT(args != nullptr); if (rr_policy_ != nullptr) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this, + gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, rr_policy_.get()); } rr_policy_->UpdateLocked(*args); @@ -1729,7 +1753,7 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { lb_policy_args.args = args; CreateRoundRobinPolicyLocked(lb_policy_args); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this, + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, rr_policy_.get()); } } @@ -1745,7 +1769,7 @@ void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg, } if (grpc_lb_glb_trace.enabled()) { gpr_log( - GPR_DEBUG, + GPR_INFO, "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", grpclb_policy, grpclb_policy->rr_policy_.get()); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h new file mode 100644 index 0000000000..4d39c4d504 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H + +#include <grpc/support/port_platform.h> + +/** Channel arg indicating if a target corresponding to the address is grpclb + * loadbalancer. The type of this arg is an integer and the value is treated as + * a bool. */ +#define GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER \ + "grpc.address_is_grpclb_load_balancer" +/** Channel arg indicating if a target corresponding to the address is a backend + * received from a balancer. The type of this arg is an integer and the value is + * treated as a bool. */ +#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER \ + "grpc.address_is_backend_from_grpclb_load_balancer" + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 9090c34412..76df976698 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -62,31 +62,65 @@ class PickFirst : public LoadBalancingPolicy { private: ~PickFirst(); + class PickFirstSubchannelList; + + class PickFirstSubchannelData + : public SubchannelData<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelData(PickFirstSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, + combiner) {} + + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; + }; + + class PickFirstSubchannelList + : public SubchannelList<PickFirstSubchannelList, + PickFirstSubchannelData> { + public: + PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, + grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + } + + ~PickFirstSubchannelList() { + PickFirst* p = static_cast<PickFirst*>(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + }; + void ShutdownLocked() override; void StartPickingLocked(); void DestroyUnselectedSubchannelsLocked(); - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); - - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - - /** all our subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; - /** latest pending subchannel list */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; - /** selected subchannel in \a subchannel_list */ - grpc_lb_subchannel_data* selected_ = nullptr; - /** have we started picking? */ + // All our subchannels. + OrphanablePtr<PickFirstSubchannelList> subchannel_list_; + // Latest pending subchannel list. + OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_; + // Selected subchannel in \a subchannel_list_. + PickFirstSubchannelData* selected_ = nullptr; + // Have we started picking? bool started_picking_ = false; - /** are we shut down? */ + // Are we shut down? bool shutdown_ = false; - /** list of picks that are waiting on connectivity */ + // List of picks that are waiting on connectivity. PickState* pending_picks_ = nullptr; - /** our connectivity state tracker */ + // Our connectivity state tracker. grpc_connectivity_state_tracker state_tracker_; }; @@ -95,7 +129,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "pick_first"); if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p created.", this); + gpr_log(GPR_INFO, "Pick First %p created.", this); } UpdateLocked(*args.args); grpc_subchannel_index_ref(); @@ -103,7 +137,7 @@ PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Destroying Pick First %p", this); + gpr_log(GPR_INFO, "Destroying Pick First %p", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); @@ -126,7 +160,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { void PickFirst::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this); + gpr_log(GPR_INFO, "Pick First %p Shutting down", this); } shutdown_ = true; PickState* pick; @@ -137,15 +171,8 @@ void PickFirst::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown"); - subchannel_list_ = nullptr; - } - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_, - "pf_shutdown"); - latest_pending_subchannel_list_ = nullptr; - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -192,14 +219,10 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void PickFirst::StartPickingLocked() { started_picking_ = true; - if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) { - subchannel_list_->checking_subchannel = 0; - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch( - subchannel_list_, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); + if (subchannel_list_ != nullptr) { + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + if (subchannel_list_->subchannel(i)->subchannel() != nullptr) { + subchannel_list_->subchannel(i)->StartConnectivityWatchLocked(); break; } } @@ -215,7 +238,7 @@ void PickFirst::ExitIdleLocked() { bool PickFirst::PickLocked(PickState* pick) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { - pick->connected_subchannel = selected_->connected_subchannel; + pick->connected_subchannel = selected_->connected_subchannel()->Ref(); return true; } // No subchannel selected yet, so handle asynchronously. @@ -228,11 +251,10 @@ bool PickFirst::PickLocked(PickState* pick) { } void PickFirst::DestroyUnselectedSubchannelsLocked() { - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i]; + for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list_->subchannel(i); if (selected_ != sd) { - grpc_lb_subchannel_data_unref_subchannel(sd, - "selected_different_subchannel"); + sd->UnrefSubchannelLocked("selected_different_subchannel"); } } } @@ -249,7 +271,7 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { if (selected_ != nullptr) { - selected_->connected_subchannel->Ping(on_initiate, on_ack); + selected_->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -258,24 +280,6 @@ void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } } -void PickFirst::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); -} - -void PickFirst::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} - void PickFirst::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { @@ -295,75 +299,67 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { return; } const grpc_lb_addresses* addresses = - (const grpc_lb_addresses*)arg->value.pointer.p; + static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( + auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( this, &grpc_lb_pick_first_trace, addresses, combiner(), - client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { + client_channel_factory(), args); + if (subchannel_list->num_subchannels() == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); - } - subchannel_list_ = subchannel_list; // Empty list. + subchannel_list_ = std::move(subchannel_list); // Empty list. selected_ = nullptr; return; } if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "pf_update_before_selected"); + subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } - subchannel_list_ = subchannel_list; } else { // We do have a selected subchannel. // Check if it's present in the new list. If so, we're done. - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - if (sd->subchannel == selected_->subchannel) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + PickFirstSubchannelData* sd = subchannel_list->subchannel(i); + if (sd->subchannel() == selected_->subchannel()) { // The currently selected subchannel is in the update: we are done. if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - this, selected_->subchannel, i, - subchannel_list->num_subchannels); - } - if (selected_->connected_subchannel != nullptr) { - sd->connected_subchannel = selected_->connected_subchannel; + this, selected_->subchannel(), i, + subchannel_list->num_subchannels()); } - selected_ = sd; - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "pf_update_includes_selected"); + // Make sure it's in state READY. It might not be if we grabbed + // the combiner while a connectivity state notification + // informing us otherwise is pending. + // Note that CheckConnectivityStateLocked() also takes a ref to + // the connected subchannel. + grpc_error* error = GRPC_ERROR_NONE; + if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) { + selected_ = sd; + subchannel_list_ = std::move(subchannel_list); + DestroyUnselectedSubchannelsLocked(); + sd->StartConnectivityWatchLocked(); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + latest_pending_subchannel_list_.reset(); + return; } - subchannel_list_ = subchannel_list; - DestroyUnselectedSubchannelsLocked(); - SubchannelListRefForConnectivityWatch( - subchannel_list, "connectivity_watch+replace_selected"); - grpc_lb_subchannel_data_start_connectivity_watch(sd); - // If there was a previously pending update (which may or may - // not have contained the currently selected subchannel), drop - // it, so that it doesn't override what we've done here. - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, - "pf_update_includes_selected+outdated"); - latest_pending_subchannel_list_ = nullptr; - } - return; + GRPC_ERROR_UNREF(error); } } // Not keeping the previous selected subchannel, so set the latest @@ -372,88 +368,66 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) { // subchannel list. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Pick First %p Shutting down latest pending subchannel list " "%p, about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); + this, latest_pending_subchannel_list_.get(), + subchannel_list.get()); } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated_dont_smash"); } - latest_pending_subchannel_list_ = subchannel_list; - } - // If we've started picking, start trying to connect to the first - // subchannel in the new list. - if (started_picking_) { - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch+update"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[0]); + latest_pending_subchannel_list_ = std::move(subchannel_list); + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (started_picking_) { + latest_pending_subchannel_list_->subchannel(0) + ->StartConnectivityWatchLocked(); + } } } -void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - PickFirst* p = static_cast<PickFirst*>(sd->subchannel_list->policy); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, - "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR - " of %" PRIuPTR - "), subchannel_list %p: state=%s p->shutdown_=%d " - "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list->checking_subchannel, - sd->subchannel_list->num_subchannels, sd->subchannel_list, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, - grpc_error_string(error)); - } - // If the policy is shutting down, unref and return. - if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_shutdown"); - return; - } - // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_sl_shutdown"); - return; - } - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - // Update state. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; +void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); + // The notification must be for a subchannel in either the current or + // latest pending subchannel lists. + GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || + subchannel_list() == p->latest_pending_subchannel_list_.get()); // Handle updates for the currently selected subchannel. - if (p->selected_ == sd) { + if (p->selected_ == this) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p connectivity changed for selected subchannel", p); + } // If the new state is anything other than READY and there is a // pending update, switch to the pending update. - if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + if (connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list_ != nullptr) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch( - sd->subchannel_list, "selected_not_ready+switch_to_update"); - grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list_, "selected_not_ready+switch_to_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + StopConnectivityWatchLocked(); + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); + error != GRPC_ERROR_NONE + ? GRPC_ERROR_REF(error) + : GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "selected subchannel not ready; switching to pending " + "update"), + "selected_not_ready+switch_to_update"); } else { // TODO(juanlishen): we re-resolve when the selected subchannel goes to // TRANSIENT_FAILURE because we used to shut down in this case before // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -462,19 +436,16 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel( - sd, "pf_selected_shutdown"); // Unrefs connected subchannel + UnrefSubchannelLocked("pf_selected_shutdown"); + StopConnectivityWatchLocked(); } else { - grpc_connectivity_state_set(&p->state_tracker_, - sd->curr_connectivity_state, + grpc_connectivity_state_set(&p->state_tracker_, connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); } } + GRPC_ERROR_UNREF(error); return; } // If we get here, there are two possible cases: @@ -486,26 +457,27 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - switch (sd->curr_connectivity_state) { + switch (connectivity_state) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list_ to // p->subchannel_list_. - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); - if (sd->subchannel_list == p->latest_pending_subchannel_list_) { - GPR_ASSERT(p->subchannel_list_ != nullptr); - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "finish_update"); - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; + if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_INFO, + "Pick First %p promoting pending subchannel list %p to " + "replace %p", + p, p->latest_pending_subchannel_list_.get(), + p->subchannel_list_.get()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - p->selected_ = sd; + p->selected_ = this; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, - sd->subchannel); + subchannel()); } // Drop all other subchannels, since we are now connected. p->DestroyUnselectedSubchannelsLocked(); @@ -513,7 +485,8 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { PickState* pick; while ((pick = p->pending_picks_)) { p->pending_picks_ = pick->next; - pick->connected_subchannel = p->selected_->connected_subchannel; + pick->connected_subchannel = + p->selected_->connected_subchannel()->Ref(); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -522,45 +495,43 @@ void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); + StopConnectivityWatchLocked(); + PickFirstSubchannelData* sd = this; do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr); + size_t next_index = + (sd->Index() + 1) % subchannel_list()->num_subchannels(); + sd = subchannel_list()->subchannel(next_index); + } while (sd->subchannel() == nullptr); // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. - if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list_) { + if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set( &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + sd->StartConnectivityWatchLocked(); break; } case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list_) { + if (subchannel_list() == p->subchannel_list_.get()) { grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), "connecting_changed"); } // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: GPR_UNREACHABLE_CODE(break); } + GRPC_ERROR_UNREF(error); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e534131c02..79e8ad5663 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -73,23 +73,127 @@ class RoundRobin : public LoadBalancingPolicy { private: ~RoundRobin(); - void ShutdownLocked() override; + // Forward declaration. + class RoundRobinSubchannelList; + + // Data for a particular subchannel in a subchannel list. + // This subclass adds the following functionality: + // - Tracks user_data associated with each address, which will be + // returned along with picks that select the subchannel. + // - Tracks the previous connectivity state of the subchannel, so that + // we know how many subchannels are in each state. + class RoundRobinSubchannelData + : public SubchannelData<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelData(RoundRobinSubchannelList* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, + grpc_subchannel* subchannel, + grpc_combiner* combiner) + : SubchannelData(subchannel_list, user_data_vtable, address, subchannel, + combiner), + user_data_vtable_(user_data_vtable), + user_data_(user_data_vtable_ != nullptr + ? user_data_vtable_->copy(address.user_data) + : nullptr) {} + + void UnrefSubchannelLocked(const char* reason) override { + SubchannelData::UnrefSubchannelLocked(reason); + if (user_data_ != nullptr) { + GPR_ASSERT(user_data_vtable_ != nullptr); + user_data_vtable_->destroy(user_data_); + user_data_ = nullptr; + } + } - void StartPickingLocked(); - size_t GetNextReadySubchannelIndexLocked(); - void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); - void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error); + void* user_data() const { return user_data_; } + + grpc_connectivity_state connectivity_state() const { + return last_connectivity_state_; + } - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + void UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error); + + private: + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override; + + const grpc_lb_user_data_vtable* user_data_vtable_; + void* user_data_ = nullptr; + grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; + }; + + // A list of subchannels. + class RoundRobinSubchannelList + : public SubchannelList<RoundRobinSubchannelList, + RoundRobinSubchannelData> { + public: + RoundRobinSubchannelList( + RoundRobin* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, addresses, combiner, + client_channel_factory, args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + } - void SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - void SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); + ~RoundRobinSubchannelList() { + GRPC_ERROR_UNREF(last_transient_failure_error_); + RoundRobin* p = static_cast<RoundRobin*>(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + + // Starts watching the subchannels in this list. + void StartWatchingLocked(); + + // Updates the counters of subchannels in each state when a + // subchannel transitions from old_state to new_state. + // transient_failure_error is the error that is reported when + // new_state is TRANSIENT_FAILURE. + void UpdateStateCountersLocked(grpc_connectivity_state old_state, + grpc_connectivity_state new_state, + grpc_error* transient_failure_error); + + // If this subchannel list is the RR policy's current subchannel + // list, updates the RR policy's connectivity state based on the + // subchannel list's state counters. + void MaybeUpdateRoundRobinConnectivityStateLocked(); + + // Updates the RR policy's overall state based on the counters of + // subchannels in each state. + void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + + size_t GetNextReadySubchannelIndexLocked(); + void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); + + private: + size_t num_ready_ = 0; + size_t num_connecting_ = 0; + size_t num_transient_failure_ = 0; + grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE; + size_t last_ready_index_ = -1; // Index into list of last pick. + }; + + void ShutdownLocked() override; + + void StartPickingLocked(); + bool DoPickLocked(PickState* pick); + void DrainPendingPicksLocked(); /** list of subchannels */ - grpc_lb_subchannel_list* subchannel_list_ = nullptr; + OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; /** have we started picking? */ bool started_picking_ = false; /** are we shutting down? */ @@ -98,14 +202,6 @@ class RoundRobin : public LoadBalancingPolicy { PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; - /** Index into subchannels for last pick. */ - size_t last_ready_subchannel_index_ = 0; - /** Latest version of the subchannel list. - * Subchannel connectivity callbacks will only promote updated subchannel - * lists if they equal \a latest_pending_subchannel_list. In other words, - * racing callbacks that reference outdated subchannel lists won't perform any - * update. */ - grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { @@ -114,15 +210,15 @@ RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { "round_robin"); UpdateLocked(*args.args); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, - subchannel_list_->num_subchannels); + gpr_log(GPR_INFO, "[RR %p] Created with %" PRIuPTR " subchannels", this, + subchannel_list_->num_subchannels()); } grpc_subchannel_index_ref(); } RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); + gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); @@ -131,68 +227,6 @@ RoundRobin::~RoundRobin() { grpc_subchannel_index_unref(); } -/** Returns the index into p->subchannel_list->subchannels of the next - * subchannel in READY state, or p->subchannel_list->num_subchannels if no - * subchannel is READY. - * - * Note that this function does *not* update p->last_ready_subchannel_index. - * The caller must do that if it returns a pick. */ -size_t RoundRobin::GetNextReadySubchannelIndexLocked() { - GPR_ASSERT(subchannel_list_ != nullptr); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] getting next ready subchannel (out of %" PRIuPTR - "), " - "last_ready_subchannel_index=%" PRIuPTR, - this, subchannel_list_->num_subchannels, - last_ready_subchannel_index_); - } - for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { - const size_t index = (i + last_ready_subchannel_index_ + 1) % - subchannel_list_->num_subchannels; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR - ": state=%s", - this, subchannel_list_->subchannels[index].subchannel, - subchannel_list_, index, - grpc_connectivity_state_name( - subchannel_list_->subchannels[index].curr_connectivity_state)); - } - if (subchannel_list_->subchannels[index].curr_connectivity_state == - GRPC_CHANNEL_READY) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR - " of subchannel_list %p", - this, subchannel_list_->subchannels[index].subchannel, index, - subchannel_list_); - } - return index; - } - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); - } - return subchannel_list_->num_subchannels; -} - -// Sets last_ready_subchannel_index_ to last_ready_index. -void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { - GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); - last_ready_subchannel_index_ = last_ready_index; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR - " (SC %p, CSC %p)", - this, last_ready_index, - subchannel_list_->subchannels[last_ready_index].subchannel, - subchannel_list_->subchannels[last_ready_index] - .connected_subchannel.get()); - } -} - void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { @@ -207,7 +241,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { void RoundRobin::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); + gpr_log(GPR_INFO, "[RR %p] Shutting down", this); } shutdown_ = true; PickState* pick; @@ -218,16 +252,8 @@ void RoundRobin::ShutdownLocked() { } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_rr_shutdown"); - subchannel_list_ = nullptr; - } - if (latest_pending_subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); - latest_pending_subchannel_list_ = nullptr; - } + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } @@ -273,70 +299,59 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, GRPC_ERROR_UNREF(error); } -void RoundRobin::SubchannelListRefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - // TODO(roth): We currently track this ref manually. Once the new - // ClosureRef API is ready and the subchannel_list code has been - // converted to a C++ API, find a way to hold the RefCountedPtr<> - // somewhere (maybe in the subchannel_data object) instead of doing - // this manually. - auto self = Ref(DEBUG_LOCATION, reason); - self.release(); - grpc_lb_subchannel_list_ref(subchannel_list, reason); +void RoundRobin::StartPickingLocked() { + started_picking_ = true; + subchannel_list_->StartWatchingLocked(); } -void RoundRobin::SubchannelListUnrefForConnectivityWatch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - Unref(DEBUG_LOCATION, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); +void RoundRobin::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); + } } -void RoundRobin::StartPickingLocked() { - started_picking_ = true; - for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { - if (subchannel_list_->subchannels[i].subchannel != nullptr) { - SubchannelListRefForConnectivityWatch(subchannel_list_, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list_->subchannels[i]); +bool RoundRobin::DoPickLocked(PickState* pick) { + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels()) { + /* readily available, report right away */ + RoundRobinSubchannelData* sd = + subchannel_list_->subchannel(next_ready_index); + GPR_ASSERT(sd->connected_subchannel() != nullptr); + pick->connected_subchannel = sd->connected_subchannel()->Ref(); + if (pick->user_data != nullptr) { + *pick->user_data = sd->user_data(); } + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " + "index %" PRIuPTR ")", + this, sd->subchannel(), pick->connected_subchannel.get(), + sd->subchannel_list(), next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + subchannel_list_->UpdateLastReadySubchannelIndexLocked(next_ready_index); + return true; } + return false; } -void RoundRobin::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); +void RoundRobin::DrainPendingPicksLocked() { + PickState* pick; + while ((pick = pending_picks_)) { + pending_picks_ = pick->next; + GPR_ASSERT(DoPickLocked(pick)); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, - shutdown_); + gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); } GPR_ASSERT(!shutdown_); if (subchannel_list_ != nullptr) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { - /* readily available, report right away */ - grpc_lb_subchannel_data* sd = - &subchannel_list_->subchannels[next_ready_index]; - pick->connected_subchannel = sd->connected_subchannel; - if (pick->user_data != nullptr) { - *pick->user_data = sd->user_data; - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " - "index %" PRIuPTR ")", - this, sd->subchannel, pick->connected_subchannel.get(), - sd->subchannel_list, next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - UpdateLastReadySubchannelIndexLocked(next_ready_index); - return true; - } + if (DoPickLocked(pick)) return true; } /* no pick currently available. Save for later in list of pending picks */ if (!started_picking_) { @@ -347,36 +362,62 @@ bool RoundRobin::PickLocked(PickState* pick) { return false; } -void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); - if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(subchannel_list->num_ready > 0); - --subchannel_list->num_ready; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(subchannel_list->num_transient_failures > 0); - --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(subchannel_list->num_idle > 0); - --subchannel_list->num_idle; +void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { + if (num_subchannels() == 0) return; + // Check current state of each subchannel synchronously, since any + // subchannel already used by some other channel may have a non-IDLE + // state. + for (size_t i = 0; i < num_subchannels(); ++i) { + grpc_error* error = GRPC_ERROR_NONE; + grpc_connectivity_state state = + subchannel(i)->CheckConnectivityStateLocked(&error); + if (state != GRPC_CHANNEL_IDLE) { + subchannel(i)->UpdateConnectivityStateLocked(state, error); + } } - sd->prev_connectivity_state = sd->curr_connectivity_state; - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++subchannel_list->num_ready; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++subchannel_list->num_idle; + // Now set the LB policy's state based on the subchannels' states. + UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + // Start connectivity watch for each subchannel. + for (size_t i = 0; i < num_subchannels(); i++) { + if (subchannel(i)->subchannel() != nullptr) { + subchannel(i)->StartConnectivityWatchLocked(); + } } } -/** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associated with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used - * only if the policy transitions to state TRANSIENT_FAILURE. */ -void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, - grpc_error* error) { +void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( + grpc_connectivity_state old_state, grpc_connectivity_state new_state, + grpc_error* transient_failure_error) { + GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); + if (old_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(num_ready_ > 0); + --num_ready_; + } else if (old_state == GRPC_CHANNEL_CONNECTING) { + GPR_ASSERT(num_connecting_ > 0); + --num_connecting_; + } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(num_transient_failure_ > 0); + --num_transient_failure_; + } + if (new_state == GRPC_CHANNEL_READY) { + ++num_ready_; + } else if (new_state == GRPC_CHANNEL_CONNECTING) { + ++num_connecting_; + } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++num_transient_failure_; + } + GRPC_ERROR_UNREF(last_transient_failure_error_); + last_transient_failure_error_ = transient_failure_error; +} + +// Sets the RR policy's connectivity state based on the current +// subchannel list. +void RoundRobin::RoundRobinSubchannelList:: + MaybeUpdateRoundRobinConnectivityStateLocked() { + RoundRobin* p = static_cast<RoundRobin*>(policy()); + // Only set connectivity state if this is the current subchannel list. + if (p->subchannel_list_.get() != this) return; /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -391,155 +432,151 @@ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ - grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); - if (subchannel_list->num_ready > 0) { + if (num_ready_ > 0) { /* 1) READY */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + } else if (num_connecting_ > 0) { /* 2) CONNECTING */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - } else if (subchannel_list->num_transient_failures == - subchannel_list->num_subchannels) { + } else if (num_transient_failure_ == num_subchannels()) { /* 3) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), + grpc_connectivity_state_set(&p->state_tracker_, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(last_transient_failure_error_), "rr_exhausted_subchannels"); } - GRPC_ERROR_UNREF(error); } -void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy); +void RoundRobin::RoundRobinSubchannelList:: + UpdateRoundRobinStateFromSubchannelStateCountsLocked() { + RoundRobin* p = static_cast<RoundRobin*>(policy()); + if (num_ready_ > 0) { + if (p->subchannel_list_.get() != this) { + // Promote this list to p->subchannel_list_. + // This list must be p->latest_pending_subchannel_list_, because + // any previous update would have been shut down already and + // therefore we would not be receiving a notification for them. + GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); + GPR_ASSERT(!shutting_down()); + if (grpc_lb_round_robin_trace.enabled()) { + const size_t old_num_subchannels = + p->subchannel_list_ != nullptr + ? p->subchannel_list_->num_subchannels() + : 0; + gpr_log(GPR_INFO, + "[RR %p] phasing out subchannel list %p (size %" PRIuPTR + ") in favor of %p (size %" PRIuPTR ")", + p, p->subchannel_list_.get(), old_num_subchannels, this, + num_subchannels()); + } + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); + } + // Drain pending picks. + p->DrainPendingPicksLocked(); + } + // Update the RR policy's connectivity state if needed. + MaybeUpdateRoundRobinConnectivityStateLocked(); +} + +void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( - GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " - "prev_state=%s new_state=%s p->shutdown=%d " - "sd->subchannel_list->shutting_down=%d error=%s", - p, sd->subchannel, sd->subchannel_list, - grpc_connectivity_state_name(sd->prev_connectivity_state), - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown_, sd->subchannel_list->shutting_down, - grpc_error_string(error)); - } - GPR_ASSERT(sd->subchannel != nullptr); - // If the policy is shutting down, unref and return. - if (p->shutdown_) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_shutdown"); - return; + GPR_INFO, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " + "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels(), + grpc_connectivity_state_name(last_connectivity_state_), + grpc_connectivity_state_name(connectivity_state)); + } + subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, + connectivity_state, error); + last_connectivity_state_ = connectivity_state; +} + +void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) { + RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); + GPR_ASSERT(subchannel() != nullptr); + // If the new state is TRANSIENT_FAILURE, re-resolve. + // Only do this if we've started watching, not at startup time. + // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE + // when the subchannel list was created, we'd wind up in a constant + // loop of re-resolution. + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, subchannel()); + } + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); } - // If the subchannel list is shutting down, stop watching. - if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); - p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, - "rr_sl_shutdown"); - return; + // Update state counters. + UpdateConnectivityStateLocked(connectivity_state, error); + // Update overall state and renew notification. + subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); + RenewConnectivityWatchLocked(); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. + * + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +size_t +RoundRobin::RoundRobinSubchannelList::GetNextReadySubchannelIndexLocked() { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] getting next ready subchannel (out of %" PRIuPTR + "), last_ready_index=%" PRIuPTR, + policy(), num_subchannels(), last_ready_index_); } - // If we're still here, the notification must be for a subchannel in - // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || - sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); - // Now that we're inside the combiner, copy the pending connectivity - // state (which was set by the connectivity state watcher) to - // curr_connectivity_state, which is what we use inside of the combiner. - sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* - // subchannel, if any. - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - sd->connected_subchannel.reset(); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " - "Requesting re-resolution", - p, sd->subchannel); - } - p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); - break; + for (size_t i = 0; i < num_subchannels(); ++i) { + const size_t index = (i + last_ready_index_ + 1) % num_subchannels(); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log( + GPR_INFO, + "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR + ": state=%s", + policy(), subchannel(index)->subchannel(), this, index, + grpc_connectivity_state_name( + subchannel(index)->connectivity_state())); } - case GRPC_CHANNEL_READY: { - if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = - grpc_subchannel_get_connected_subchannel(sd->subchannel); - } - if (sd->subchannel_list != p->subchannel_list_) { - // promote sd->subchannel_list to p->subchannel_list_. - // sd->subchannel_list must be equal to - // p->latest_pending_subchannel_list_ because we have already filtered - // for sds belonging to outdated subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); - GPR_ASSERT(!sd->subchannel_list->shutting_down); - if (grpc_lb_round_robin_trace.enabled()) { - const size_t num_subchannels = - p->subchannel_list_ != nullptr - ? p->subchannel_list_->num_subchannels - : 0; - gpr_log(GPR_DEBUG, - "[RR %p] phasing out subchannel list %p (size %" PRIuPTR - ") in favor of %p (size %" PRIuPTR ")", - p, p->subchannel_list_, num_subchannels, sd->subchannel_list, - num_subchannels); - } - if (p->subchannel_list_ != nullptr) { - // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, - "sl_phase_out_shutdown"); - } - p->subchannel_list_ = p->latest_pending_subchannel_list_; - p->latest_pending_subchannel_list_ = nullptr; - } - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ - const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); - GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); - grpc_lb_subchannel_data* selected = - &p->subchannel_list_->subchannels[next_ready_index]; - if (p->pending_picks_ != nullptr) { - // if the selected subchannel is going to be used for the pending - // picks, update the last picked pointer - p->UpdateLastReadySubchannelIndexLocked(next_ready_index); - } - PickState* pick; - while ((pick = p->pending_picks_)) { - p->pending_picks_ = pick->next; - pick->connected_subchannel = selected->connected_subchannel; - if (pick->user_data != nullptr) { - *pick->user_data = selected->user_data; - } - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " - "(subchannel_list %p, index %" PRIuPTR ")", - p, selected->subchannel, p->subchannel_list_, - next_ready_index); - } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + if (subchannel(index)->connectivity_state() == GRPC_CHANNEL_READY) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR + " of subchannel_list %p", + policy(), subchannel(index)->subchannel(), index, this); } - break; + return index; } - case GRPC_CHANNEL_SHUTDOWN: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE:; // fallthrough } - // Update state counters. - UpdateStateCountersLocked(sd); - // Only update connectivity based on the selected subchannel list. - if (sd->subchannel_list == p->subchannel_list_) { - p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, "[RR %p] no subchannels in ready state", this); + } + return num_subchannels(); +} + +// Sets last_ready_index_ to last_ready_index. +void RoundRobin::RoundRobinSubchannelList::UpdateLastReadySubchannelIndexLocked( + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < num_subchannels()); + last_ready_index_ = last_ready_index; + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR + " (SC %p, CSC %p)", + policy(), last_ready_index, + subchannel(last_ready_index)->subchannel(), + subchannel(last_ready_index)->connected_subchannel()); } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( @@ -555,11 +592,12 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); - if (next_ready_index < subchannel_list_->num_subchannels) { - grpc_lb_subchannel_data* selected = - &subchannel_list_->subchannels[next_ready_index]; - selected->connected_subchannel->Ping(on_initiate, on_ack); + const size_t next_ready_index = + subchannel_list_->GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels()) { + RoundRobinSubchannelData* selected = + subchannel_list_->subchannel(next_ready_index); + selected->connected_subchannel()->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -582,80 +620,37 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args) { } return; } - grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(arg->value.pointer.p); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", + gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } - grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - this, &grpc_lb_round_robin_trace, addresses, combiner(), - client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); - if (subchannel_list->num_subchannels == 0) { - grpc_connectivity_state_set( - &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), - "rr_update_empty"); - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, - "sl_shutdown_empty_update"); + // Replace latest_pending_subchannel_list_. + if (latest_pending_subchannel_list_ != nullptr) { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_INFO, + "[RR %p] Shutting down previous pending subchannel list %p", this, + latest_pending_subchannel_list_.get()); } - subchannel_list_ = subchannel_list; // empty list - return; } - if (started_picking_) { - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - const grpc_connectivity_state subchannel_state = - grpc_subchannel_check_connectivity( - subchannel_list->subchannels[i].subchannel, nullptr); - // Override the default setting of IDLE for connectivity notification - // purposes if the subchannel is already in transient failure. Otherwise - // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE - // discrepancy, attempt to re-resolve and end up here again. - // TODO(roth): As part of C++-ifying the subchannel_list API, design a - // better API for notifying the LB policy of subchannel states, which can - // be used both for the subchannel's initial state and for subsequent - // state changes. This will allow us to handle this more generally instead - // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any - // pending picks across all READY subchannels rather than sending them all - // to the first one). - if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - subchannel_list->subchannels[i].pending_connectivity_state_unsafe = - subchannel_list->subchannels[i].curr_connectivity_state = - subchannel_list->subchannels[i].prev_connectivity_state = - subchannel_state; - --subchannel_list->num_idle; - ++subchannel_list->num_transient_failures; - } - } - if (latest_pending_subchannel_list_ != nullptr) { - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, - "[RR %p] Shutting down latest pending subchannel list %p, " - "about to be replaced by newer latest %p", - this, latest_pending_subchannel_list_, subchannel_list); - } - grpc_lb_subchannel_list_shutdown_and_unref( - latest_pending_subchannel_list_, "sl_outdated"); - } - latest_pending_subchannel_list_ = subchannel_list; - for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { - /* Watch every new subchannel. A subchannel list becomes active the - * moment one of its subchannels is READY. At that moment, we swap - * p->subchannel_list for sd->subchannel_list, provided the subchannel - * list is still valid (ie, isn't shutting down) */ - SubchannelListRefForConnectivityWatch(subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - &subchannel_list->subchannels[i]); + latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( + this, &grpc_lb_round_robin_trace, addresses, combiner(), + client_channel_factory(), args); + // If we haven't started picking yet or the new list is empty, + // immediately promote the new list to the current list. + if (!started_picking_ || + latest_pending_subchannel_list_->num_subchannels() == 0) { + if (latest_pending_subchannel_list_->num_subchannels() == 0) { + grpc_connectivity_state_set( + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); } + subchannel_list_ = std::move(latest_pending_subchannel_list_); } else { - // The policy isn't picking yet. Save the update for later, disposing of - // previous version if any. - if (subchannel_list_ != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - subchannel_list_, "rr_update_before_started_picking"); - } - subchannel_list_ = subchannel_list; + // If we've started picking, start watching the new list. + latest_pending_subchannel_list_->StartWatchingLocked(); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc deleted file mode 100644 index 79cb64c6c6..0000000000 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <string.h> - -#include <grpc/support/alloc.h> - -#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/combiner.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/transport/connectivity_state.h" - -void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, - const char* reason) { - if (sd->subchannel != nullptr) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): unreffing subchannel", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); - } - GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); - sd->subchannel = nullptr; - sd->connected_subchannel.reset(); - if (sd->user_data != nullptr) { - GPR_ASSERT(sd->user_data_vtable != nullptr); - sd->user_data_vtable->destroy(sd->user_data); - sd->user_data = nullptr; - } - } -} - -void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_lb_subchannel_data* sd) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log( - GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): requesting connectivity change " - "notification (from %s)", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe)); - } - sd->connectivity_notification_pending = true; - grpc_subchannel_notify_on_state_change( - sd->subchannel, sd->subchannel_list->policy->interested_parties(), - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); -} - -void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_lb_subchannel_data* sd) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): stopping connectivity watch", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); - } - GPR_ASSERT(sd->connectivity_notification_pending); - sd->connectivity_notification_pending = false; -} - -grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, - const grpc_lb_addresses* addresses, grpc_combiner* combiner, - grpc_client_channel_factory* client_channel_factory, - const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) { - grpc_lb_subchannel_list* subchannel_list = - static_cast<grpc_lb_subchannel_list*>( - gpr_zalloc(sizeof(*subchannel_list))); - if (tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer->name(), p, subchannel_list, addresses->num_addresses); - } - subchannel_list->policy = p; - subchannel_list->tracer = tracer; - gpr_ref_init(&subchannel_list->refcount, 1); - subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>( - gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses)); - // We need to remove the LB addresses in order to be able to compare the - // subchannel keys of subchannels from a different batch of addresses. - static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - // Create a subchannel for each address. - grpc_subchannel_args sc_args; - size_t subchannel_index = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( - &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( - client_channel_factory, &sc_args); - grpc_channel_args_destroy(new_args); - if (subchannel == nullptr) { - // Subchannel could not be created. - if (tracer->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "[%s %p] could not create subchannel for address uri %s, " - "ignoring", - tracer->name(), subchannel_list->policy, address_uri); - gpr_free(address_uri); - } - continue; - } - if (tracer->enabled()) { - char* address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR - ": Created subchannel %p for address uri %s", - tracer->name(), p, subchannel_list, subchannel_index, subchannel, - address_uri); - gpr_free(address_uri); - } - grpc_lb_subchannel_data* sd = - &subchannel_list->subchannels[subchannel_index++]; - sd->subchannel_list = subchannel_list; - sd->subchannel = subchannel; - GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, - connectivity_changed_cb, sd, - grpc_combiner_scheduler(combiner)); - // We assume that the current state is IDLE. If not, we'll get a - // callback telling us that. - sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != nullptr) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - } - subchannel_list->num_subchannels = subchannel_index; - subchannel_list->num_idle = subchannel_index; - return subchannel_list; -} - -static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) { - if (subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list); - } - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy"); - } - gpr_free(subchannel_list->subchannels); - gpr_free(subchannel_list); -} - -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, - const char* reason) { - gpr_ref_non_zero(&subchannel_list->refcount); - if (subchannel_list->tracer->enabled()) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, static_cast<unsigned long>(count - 1), - static_cast<unsigned long>(count), reason); - } -} - -void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, - const char* reason) { - const bool done = gpr_unref(&subchannel_list->refcount); - if (subchannel_list->tracer->enabled()) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, static_cast<unsigned long>(count + 1), - static_cast<unsigned long>(count), reason); - } - if (done) { - subchannel_list_destroy(subchannel_list); - } -} - -static void subchannel_data_cancel_connectivity_watch( - grpc_lb_subchannel_data* sd, const char* reason) { - if (sd->subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling connectivity watch (%s)", - sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, - sd->subchannel_list, - static_cast<size_t>(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, reason); - } - grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr, - &sd->connectivity_changed_closure); -} - -void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - if (subchannel_list->tracer->enabled()) { - gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", - subchannel_list->tracer->name(), subchannel_list->policy, - subchannel_list, reason); - } - GPR_ASSERT(!subchannel_list->shutting_down); - subchannel_list->shutting_down = true; - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - // If there's a pending notification for this subchannel, cancel it; - // the callback is responsible for unreffing the subchannel. - // Otherwise, unref the subchannel directly. - if (sd->connectivity_notification_pending) { - subchannel_data_cancel_connectivity_watch(sd, reason); - } else if (sd->subchannel != nullptr) { - grpc_lb_subchannel_data_unref_subchannel(sd, reason); - } - } - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 6889d596ac..7e2046bcdc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -21,116 +21,516 @@ #include <grpc/support/port_platform.h> +#include <string.h> + +#include <grpc/support/alloc.h> + #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -// TODO(roth): This code is intended to be shared between pick_first and -// round_robin. However, the interface needs more work to provide clean -// encapsulation. For example, the structs here have some fields that are -// only used in one of the two (e.g., the state counters in -// grpc_lb_subchannel_list and the prev_connectivity_state field in -// grpc_lb_subchannel_data are only used in round_robin, and the -// checking_subchannel field in grpc_lb_subchannel_list is only used by -// pick_first). Also, there is probably some code duplication between the -// connectivity state notification callback code in both pick_first and -// round_robin that could be refactored and moved here. In a future PR, -// need to clean this up. - -typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; - -typedef struct { - /** backpointer to owning subchannel list */ - grpc_lb_subchannel_list* subchannel_list; - /** subchannel itself */ - grpc_subchannel* subchannel; - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; - /** Is a connectivity notification pending? */ - bool connectivity_notification_pending; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** previous and current connectivity states. Updated by \a - * \a connectivity_changed_closure based on - * \a pending_connectivity_state_unsafe. */ - grpc_connectivity_state prev_connectivity_state; - grpc_connectivity_state curr_connectivity_state; - /** connectivity state to be updated by - * grpc_subchannel_notify_on_state_change(), not guarded by - * the combiner. To be copied to \a curr_connectivity_state by - * \a connectivity_changed_closure. */ - grpc_connectivity_state pending_connectivity_state_unsafe; - /** the subchannel's target user data */ - void* user_data; - /** vtable to operate over \a user_data */ - const grpc_lb_user_data_vtable* user_data_vtable; -} grpc_lb_subchannel_data; - -/// Unrefs the subchannel contained in sd. -void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, - const char* reason); - -/// Starts watching the connectivity state of the subchannel. -/// The connectivity_changed_cb callback must invoke either -/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call -/// grpc_lb_subchannel_data_start_connectivity_watch(). -void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_lb_subchannel_data* sd); - -/// Stops watching the connectivity state of the subchannel. -void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_lb_subchannel_data* sd); - -struct grpc_lb_subchannel_list { - /** backpointer to owning policy */ - grpc_core::LoadBalancingPolicy* policy; - - grpc_core::TraceFlag* tracer; - - /** all our subchannels */ - size_t num_subchannels; - grpc_lb_subchannel_data* subchannels; - - /** Index into subchannels of the one we're currently checking. - * Used when connecting to subchannels serially instead of in parallel. */ - // TODO(roth): When we have time, we can probably make this go away - // and compute the index dynamically by subtracting - // subchannel_list->subchannels from the subchannel_data pointer. - size_t checking_subchannel; - - /** how many subchannels are in state READY */ - size_t num_ready; - /** how many subchannels are in state TRANSIENT_FAILURE */ - size_t num_transient_failures; - /** how many subchannels are in state IDLE */ - size_t num_idle; - - /** There will be one ref for each entry in subchannels for which there is a - * pending connectivity state watcher callback. */ - gpr_refcount refcount; - - /** Is this list shutting down? This may be true due to the shutdown of the - * policy itself or because a newer update has arrived while this one hadn't - * finished processing. */ - bool shutting_down; +// Code for maintaining a list of subchannels within an LB policy. +// +// To use this, callers must create their own subclasses, like so: +/* + +class MySubchannelList; // Forward declaration. + +class MySubchannelData + : public SubchannelData<MySubchannelList, MySubchannelData> { + public: + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, grpc_error* error) override { + // ...code to handle connectivity changes... + } +}; + +class MySubchannelList + : public SubchannelList<MySubchannelList, MySubchannelData> { }; -grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, +*/ +// All methods with a Locked() suffix must be called from within the +// client_channel combiner. + +namespace grpc_core { + +// Stores data for a particular subchannel in a subchannel list. +// Callers must create a subclass that implements the +// ProcessConnectivityChangeLocked() method. +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelData { + public: + // Returns a pointer to the subchannel list containing this object. + SubchannelListType* subchannel_list() const { return subchannel_list_; } + + // Returns the index into the subchannel list of this object. + size_t Index() const { + return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) - + subchannel_list_->subchannel(0)); + } + + // Returns a pointer to the subchannel. + grpc_subchannel* subchannel() const { return subchannel_; } + + // Returns the connected subchannel. Will be null if the subchannel + // is not connected. + ConnectedSubchannel* connected_subchannel() const { + return connected_subchannel_.get(); + } + + // Synchronously checks the subchannel's connectivity state. + // Must not be called while there is a connectivity notification + // pending (i.e., between calling StartConnectivityWatchLocked() or + // RenewConnectivityWatchLocked() and the resulting invocation of + // ProcessConnectivityChangeLocked()). + grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) { + GPR_ASSERT(!connectivity_notification_pending_); + pending_connectivity_state_unsafe_ = + grpc_subchannel_check_connectivity(subchannel(), error); + UpdateConnectedSubchannelLocked(); + return pending_connectivity_state_unsafe_; + } + + // Unrefs the subchannel. May be used if an individual subchannel is + // no longer needed even though the subchannel list as a whole is not + // being unreffed. + virtual void UnrefSubchannelLocked(const char* reason); + + // Starts watching the connectivity state of the subchannel. + // ProcessConnectivityChangeLocked() will be called when the + // connectivity state changes. + void StartConnectivityWatchLocked(); + + // Renews watching the connectivity state of the subchannel. + void RenewConnectivityWatchLocked(); + + // Stops watching the connectivity state of the subchannel. + void StopConnectivityWatchLocked(); + + // Cancels watching the connectivity state of the subchannel. + // Must be called only while there is a connectivity notification + // pending (i.e., between calling StartConnectivityWatchLocked() or + // RenewConnectivityWatchLocked() and the resulting invocation of + // ProcessConnectivityChangeLocked()). + // From within ProcessConnectivityChangeLocked(), use + // StopConnectivityWatchLocked() instead. + void CancelConnectivityWatchLocked(const char* reason); + + // Cancels any pending connectivity watch and unrefs the subchannel. + void ShutdownLocked(); + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelData(SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner); + + virtual ~SubchannelData(); + + // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked() + // is called, this method will be invoked when the subchannel's connectivity + // state changes. + // Implementations must invoke either RenewConnectivityWatchLocked() or + // StopConnectivityWatchLocked() before returning. + virtual void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state, + grpc_error* error) GRPC_ABSTRACT; + + private: + // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. + // Returns true if the connectivity state should be reported. + bool UpdateConnectedSubchannelLocked(); + + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + // Backpointer to owning subchannel list. Not owned. + SubchannelListType* subchannel_list_; + + // The subchannel and connected subchannel. + grpc_subchannel* subchannel_; + RefCountedPtr<ConnectedSubchannel> connected_subchannel_; + + // Notification that connectivity has changed on subchannel. + grpc_closure connectivity_changed_closure_; + // Is a connectivity notification pending? + bool connectivity_notification_pending_ = false; + // Connectivity state to be updated by + // grpc_subchannel_notify_on_state_change(), not guarded by + // the combiner. + grpc_connectivity_state pending_connectivity_state_unsafe_; +}; + +// A list of subchannels. +template <typename SubchannelListType, typename SubchannelDataType> +class SubchannelList + : public InternallyRefCountedWithTracing<SubchannelListType> { + public: + typedef InlinedVector<SubchannelDataType, 10> SubchannelVector; + + // The number of subchannels in the list. + size_t num_subchannels() const { return subchannels_.size(); } + + // The data for the subchannel at a particular index. + SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; } + + // Returns true if the subchannel list is shutting down. + bool shutting_down() const { return shutting_down_; } + + // Accessors. + LoadBalancingPolicy* policy() const { return policy_; } + TraceFlag* tracer() const { return tracer_; } + + // Note: Caller must ensure that this is invoked inside of the combiner. + void Orphan() override { + ShutdownLocked(); + InternallyRefCountedWithTracing<SubchannelListType>::Unref(DEBUG_LOCATION, + "shutdown"); + } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args); + + virtual ~SubchannelList(); + + private: + // So New() can call our private ctor. + template <typename T, typename... Args> + friend T* New(Args&&... args); + + // For accessing Ref() and Unref(). + friend class SubchannelData<SubchannelListType, SubchannelDataType>; + + void ShutdownLocked(); + + // Backpointer to owning policy. + LoadBalancingPolicy* policy_; + + TraceFlag* tracer_; + + grpc_combiner* combiner_; + + // The list of subchannels. + SubchannelVector subchannels_; + + // Is this list shutting down? This may be true due to the shutdown of the + // policy itself or because a newer update has arrived while this one hadn't + // finished processing. + bool shutting_down_ = false; +}; + +// +// implementation -- no user-servicable parts below +// + +// +// SubchannelData +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData( + SubchannelListType* subchannel_list, + const grpc_lb_user_data_vtable* user_data_vtable, + const grpc_lb_address& address, grpc_subchannel* subchannel, + grpc_combiner* combiner) + : subchannel_list_(subchannel_list), + subchannel_(subchannel), + // We assume that the current state is IDLE. If not, we'll get a + // callback telling us that. + pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) { + GRPC_CLOSURE_INIT( + &connectivity_changed_closure_, + (&SubchannelData<SubchannelListType, + SubchannelDataType>::OnConnectivityChangedLocked), + this, grpc_combiner_scheduler(combiner)); +} + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() { + UnrefSubchannelLocked("subchannel_data_destroy"); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, SubchannelDataType>:: + UnrefSubchannelLocked(const char* reason) { + if (subchannel_ != nullptr) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): unreffing subchannel", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_); + } + GRPC_SUBCHANNEL_UNREF(subchannel_, reason); + subchannel_ = nullptr; + connected_subchannel_.reset(); + } +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StartConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): starting watch: requesting connectivity change " + "notification (from %s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_, + grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); + } + GPR_ASSERT(!connectivity_notification_pending_); + connectivity_notification_pending_ = true; + subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release(); + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), + &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::RenewConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): renewing watch: requesting connectivity change " + "notification (from %s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_, + grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); + } + GPR_ASSERT(connectivity_notification_pending_); + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), + &pending_connectivity_state_unsafe_, &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, + SubchannelDataType>::StopConnectivityWatchLocked() { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): stopping connectivity watch", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_); + } + GPR_ASSERT(connectivity_notification_pending_); + connectivity_notification_pending_ = false; + subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, SubchannelDataType>:: + CancelConnectivityWatchLocked(const char* reason) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): canceling connectivity watch (%s)", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_, reason); + } + GPR_ASSERT(connectivity_notification_pending_); + grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr, + &connectivity_changed_closure_); +} + +template <typename SubchannelListType, typename SubchannelDataType> +bool SubchannelData<SubchannelListType, + SubchannelDataType>::UpdateConnectedSubchannelLocked() { + // If the subchannel is READY, take a ref to the connected subchannel. + if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) { + connected_subchannel_ = + grpc_subchannel_get_connected_subchannel(subchannel_); + // If the subchannel became disconnected between the time that READY + // was reported and the time we got here (e.g., between when a + // notification callback is scheduled and when it was actually run in + // the combiner), then the connected subchannel may have disappeared out + // from under us. In that case, we don't actually want to consider the + // subchannel to be in state READY. Instead, we use IDLE as the + // basis for any future connectivity watch; this is the one state that + // the subchannel will never transition back into, so this ensures + // that we will get a notification for the next state, even if that state + // is READY again (e.g., if the subchannel has transitioned back to + // READY before the next watch gets requested). + if (connected_subchannel_ == nullptr) { + if (subchannel_list_->tracer()->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): state is READY but connected subchannel is " + "null; moving to state IDLE", + subchannel_list_->tracer()->name(), subchannel_list_->policy(), + subchannel_list_, Index(), subchannel_list_->num_subchannels(), + subchannel_); + } + pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE; + return false; + } + } else { + // For any state other than READY, unref the connected subchannel. + connected_subchannel_.reset(); + } + return true; +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, SubchannelDataType>:: + OnConnectivityChangedLocked(void* arg, grpc_error* error) { + SubchannelData* sd = static_cast<SubchannelData*>(arg); + if (sd->subchannel_list_->tracer()->enabled()) { + gpr_log( + GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): connectivity changed: state=%s, error=%s, " + "shutting_down=%d", + sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(), + sd->subchannel_list_, sd->Index(), + sd->subchannel_list_->num_subchannels(), sd->subchannel_, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), + grpc_error_string(error), sd->subchannel_list_->shutting_down()); + } + // If shutting down, unref subchannel and stop watching. + if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { + sd->UnrefSubchannelLocked("connectivity_shutdown"); + sd->StopConnectivityWatchLocked(); + return; + } + // Get or release ref to connected subchannel. + if (!sd->UpdateConnectedSubchannelLocked()) { + // We don't want to report this connectivity state, so renew the watch. + sd->RenewConnectivityWatchLocked(); + return; + } + // Call the subclass's ProcessConnectivityChangeLocked() method. + sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_, + GRPC_ERROR_REF(error)); +} + +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() { + // If there's a pending notification for this subchannel, cancel it; + // the callback is responsible for unreffing the subchannel. + // Otherwise, unref the subchannel directly. + if (connectivity_notification_pending_) { + CancelConnectivityWatchLocked("shutdown"); + } else if (subchannel_ != nullptr) { + UnrefSubchannelLocked("shutdown"); + } +} + +// +// SubchannelList +// + +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( + LoadBalancingPolicy* policy, TraceFlag* tracer, const grpc_lb_addresses* addresses, grpc_combiner* combiner, grpc_client_channel_factory* client_channel_factory, - const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb); + const grpc_channel_args& args) + : InternallyRefCountedWithTracing<SubchannelListType>(tracer), + policy_(policy), + tracer_(tracer), + combiner_(GRPC_COMBINER_REF(combiner, "subchannel_list")) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, + "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", + tracer_->name(), policy, this, addresses->num_addresses); + } + subchannels_.reserve(addresses->num_addresses); + // We need to remove the LB addresses in order to be able to compare the + // subchannel keys of subchannels from a different batch of addresses. + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + // Create a subchannel for each address. + grpc_subchannel_args sc_args; + for (size_t i = 0; i < addresses->num_addresses; i++) { + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; + grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( + client_channel_factory, &sc_args); + grpc_channel_args_destroy(new_args); + if (subchannel == nullptr) { + // Subchannel could not be created. + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_INFO, + "[%s %p] could not create subchannel for address uri %s, " + "ignoring", + tracer_->name(), policy_, address_uri); + gpr_free(address_uri); + } + continue; + } + if (tracer_->enabled()) { + char* address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address uri %s", + tracer_->name(), policy_, this, subchannels_.size(), subchannel, + address_uri); + gpr_free(address_uri); + } + subchannels_.emplace_back(static_cast<SubchannelListType*>(this), + addresses->user_data_vtable, + addresses->addresses[i], subchannel, combiner); + } +} -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(), + policy_, this); + } + GRPC_COMBINER_UNREF(combiner_, "subchannel_list"); +} -void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, - const char* reason); +template <typename SubchannelListType, typename SubchannelDataType> +void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", + tracer_->name(), policy_, this); + } + GPR_ASSERT(!shutting_down_); + shutting_down_ = true; + for (size_t i = 0; i < subchannels_.size(); i++) { + SubchannelDataType* sd = &subchannels_[i]; + sd->ShutdownLocked(); + } +} -/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The -/// connectivity state notification callback will ultimately unref it. -void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_lb_subchannel_list* subchannel_list, const char* reason); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc index 80646a10cc..7c8cba55b7 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc @@ -66,7 +66,7 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, if (user_data != nullptr) GPR_ASSERT(addresses->user_data_vtable != nullptr); grpc_lb_address* target = &addresses->addresses[index]; memcpy(target->address.addr, address, address_len); - target->address.len = address_len; + target->address.len = static_cast<socklen_t>(address_len); target->is_balancer = is_balancer; target->balancer_name = gpr_strdup(balancer_name); target->user_data = user_data; diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index b8bbd32072..6440258158 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -21,7 +21,6 @@ #include <grpc/support/port_platform.h> -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h index 2283d848bd..2e9bb061ed 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.h +++ b/src/core/ext/filters/client_channel/lb_policy_registry.h @@ -24,7 +24,6 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc index 374b87e170..1f116bb67d 100644 --- a/src/core/ext/filters/client_channel/method_params.cc +++ b/src/core/ext/filters/client_channel/method_params.cc @@ -26,7 +26,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/method_params.h" -#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h index 48ece29867..a31d360f17 100644 --- a/src/core/ext/filters/client_channel/method_params.h +++ b/src/core/ext/filters/client_channel/method_params.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis @@ -60,6 +60,10 @@ class ClientChannelMethodParams : public RefCounted<ClientChannelMethodParams> { template <typename T, typename... Args> friend T* grpc_core::New(Args&&... args); + // So Delete() can call our private dtor. + template <typename T> + friend void grpc_core::Delete(T*); + ClientChannelMethodParams() {} virtual ~ClientChannelMethodParams() {} diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index e78dc99e0b..b3900114ad 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -20,6 +20,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" #include <stdio.h> #include <string.h> @@ -49,7 +50,7 @@ bool grpc_parse_unix(const grpc_uri* uri, if (path_len == maxlen) return false; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); - resolved_addr->len = sizeof(*un); + resolved_addr->len = static_cast<socklen_t>(sizeof(*un)); return true; } @@ -71,10 +72,10 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, if (!gpr_split_host_port(hostport, &host, &port)) return false; // Parse IP address. memset(addr, 0, sizeof(*addr)); - addr->len = sizeof(struct sockaddr_in); - struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr->addr); - in->sin_family = AF_INET; - if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); + grpc_sockaddr_in* in = reinterpret_cast<grpc_sockaddr_in*>(addr->addr); + in->sin_family = GRPC_AF_INET; + if (grpc_inet_pton(GRPC_AF_INET, host, &in->sin_addr) == 0) { if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); goto done; } @@ -88,7 +89,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); goto done; } - in->sin_port = htons(static_cast<uint16_t>(port_num)); + in->sin_port = grpc_htons(static_cast<uint16_t>(port_num)); success = true; done: gpr_free(host); @@ -117,19 +118,20 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, if (!gpr_split_host_port(hostport, &host, &port)) return false; // Parse IP address. memset(addr, 0, sizeof(*addr)); - addr->len = sizeof(struct sockaddr_in6); - struct sockaddr_in6* in6 = reinterpret_cast<struct sockaddr_in6*>(addr->addr); - in6->sin6_family = AF_INET6; + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); + grpc_sockaddr_in6* in6 = reinterpret_cast<grpc_sockaddr_in6*>(addr->addr); + in6->sin6_family = GRPC_AF_INET6; // Handle the RFC6874 syntax for IPv6 zone identifiers. char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host))); if (host_end != nullptr) { GPR_ASSERT(host_end >= host); - char host_without_scope[INET6_ADDRSTRLEN]; + char host_without_scope[GRPC_INET6_ADDRSTRLEN]; size_t host_without_scope_len = static_cast<size_t>(host_end - host); uint32_t sin6_scope_id = 0; strncpy(host_without_scope, host, host_without_scope_len); host_without_scope[host_without_scope_len] = '\0'; - if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { + if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) == + 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); goto done; } @@ -142,7 +144,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027. in6->sin6_scope_id = sin6_scope_id; } else { - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); goto done; } @@ -157,7 +159,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); goto done; } - in6->sin6_port = htons(static_cast<uint16_t>(port_num)); + in6->sin6_port = grpc_htons(static_cast<uint16_t>(port_num)); success = true; done: gpr_free(host); diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 1685a6c803..02380314dd 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -53,8 +53,12 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> { /// Requests a callback when a new result becomes available. /// When the new result is available, sets \a *result to the new result /// and schedules \a on_complete for execution. + /// Upon transient failure, sets \a *result to nullptr and schedules + /// \a on_complete with no error. /// If resolution is fatally broken, sets \a *result to nullptr and /// schedules \a on_complete with an error. + /// TODO(roth): When we have time, improve the way this API represents + /// transient failure vs. shutdown. /// /// Note that the client channel will almost always have a request /// to \a NextLocked() pending. When it gets the callback, it will @@ -101,6 +105,10 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> { GRPC_ABSTRACT_BASE_CLASS protected: + // So Delete() can access our protected dtor. + template <typename T> + friend void Delete(T*); + /// Does NOT take ownership of the reference to \a combiner. // TODO(roth): Once we have a C++-like interface for combiners, this // API should change to take a RefCountedPtr<>, so that we always take diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index aa93e5d8de..c3c62b60bf 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -28,6 +28,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> +#include <address_sorting/address_sorting.h> + #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" @@ -133,6 +135,7 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args) if (path[0] == '/') ++path; name_to_resolve_ = gpr_strdup(path); // Get DNS server from URI authority. + dns_server_ = nullptr; if (0 != strcmp(args.uri->authority, "")) { dns_server_ = gpr_strdup(args.uri->authority); } @@ -360,6 +363,15 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { } void AresDnsResolver::MaybeStartResolvingLocked() { + // If there is an existing timer, the time it fires is the earliest time we + // can start the next resolution. + if (have_next_resolution_timer_) { + // TODO(dgq): remove the following two lines once Pick First stops + // discarding subchannels after selecting. + ++resolved_version_; + MaybeFinishNextLocked(); + return; + } if (last_resolution_timestamp_ >= 0) { const grpc_millis earliest_next_resolution = last_resolution_timestamp_ + min_time_between_resolutions_; @@ -372,17 +384,15 @@ void AresDnsResolver::MaybeStartResolvingLocked() { "In cooldown from last resolution (from %" PRIdPTR " ms ago). Will resolve again in %" PRIdPTR " ms", last_resolution_ago, ms_until_next_resolution); - if (!have_next_resolution_timer_) { - have_next_resolution_timer_ = true; - // TODO(roth): We currently deal with this ref manually. Once the - // new closure API is done, find a way to track this ref with the timer - // callback as part of the type system. - RefCountedPtr<Resolver> self = - Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); - self.release(); - grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, - &on_next_resolution_); - } + have_next_resolution_timer_ = true; + // TODO(roth): We currently deal with this ref manually. Once the + // new closure API is done, find a way to track this ref with the timer + // callback as part of the type system. + RefCountedPtr<Resolver> self = + Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); + self.release(); + grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, + &on_next_resolution_); // TODO(dgq): remove the following two lines once Pick First stops // discarding subchannels after selecting. ++resolved_version_; @@ -394,6 +404,7 @@ void AresDnsResolver::MaybeStartResolvingLocked() { } void AresDnsResolver::StartResolvingLocked() { + gpr_log(GPR_DEBUG, "Start resolving."); // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. @@ -440,17 +451,32 @@ class AresDnsResolverFactory : public ResolverFactory { } // namespace grpc_core +extern grpc_address_resolver_vtable* grpc_resolve_address_impl; +static grpc_address_resolver_vtable* default_resolver; + +static grpc_error* blocking_resolve_address_ares( + const char* name, const char* default_port, + grpc_resolved_addresses** addresses) { + return default_resolver->blocking_resolve_address(name, default_port, + addresses); +} + +static grpc_address_resolver_vtable ares_resolver = { + grpc_resolve_address_ares, blocking_resolve_address_ares}; + void grpc_resolver_dns_ares_init() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); /* TODO(zyc): Turn on c-ares based resolver by default after the address sorter and the CNAME support are added. */ if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + address_sorting_init(); grpc_error* error = grpc_ares_init(); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("ares_library_init() failed", error); return; } - grpc_resolve_address = grpc_resolve_address_ares; + default_resolver = grpc_resolve_address_impl; + grpc_set_resolver_impl(&ares_resolver); grpc_core::ResolverRegistry::Builder::RegisterResolverFactory( grpc_core::UniquePtr<grpc_core::ResolverFactory>( grpc_core::New<grpc_core::AresDnsResolverFactory>())); @@ -461,6 +487,7 @@ void grpc_resolver_dns_ares_init() { void grpc_resolver_dns_ares_shutdown() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + address_sorting_shutdown(); grpc_ares_cleanup(); } gpr_free(resolver_env); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 0bc13e35f4..6239549534 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -22,7 +22,6 @@ #include <grpc/support/port_platform.h> #include <ares.h> -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 71b06eb87e..e86ab5a37e 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -33,6 +33,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include <address_sorting/address_sorting.h> #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/host_port.h" @@ -46,6 +47,9 @@ static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; +grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, + "cares_address_sorting"); + struct grpc_ares_request { /** indicates the DNS server to use, if specified */ struct ares_addr_port_node dns_server_addr; @@ -96,11 +100,63 @@ static void grpc_ares_request_ref(grpc_ares_request* r) { gpr_ref(&r->pending_queries); } +static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, + const char* input_output_str) { + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + char* addr_str; + if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address, + true)) { + gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s", + input_output_str, i, addr_str); + gpr_free(addr_str); + } else { + gpr_log(GPR_DEBUG, + "c-ares address sorting: %s[%" PRIuPTR "]=<unprintable>", + input_output_str, i); + } + } +} + +void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { + if (grpc_trace_cares_address_sorting.enabled()) { + log_address_sorting_list(lb_addrs, "input"); + } + address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc( + sizeof(address_sorting_sortable) * lb_addrs->num_addresses); + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + sortables[i].user_data = &lb_addrs->addresses[i]; + memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr, + lb_addrs->addresses[i].address.len); + sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len; + } + address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses); + grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc( + sizeof(grpc_lb_address) * lb_addrs->num_addresses); + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data; + } + gpr_free(sortables); + gpr_free(lb_addrs->addresses); + lb_addrs->addresses = sorted_lb_addrs; + if (grpc_trace_cares_address_sorting.enabled()) { + log_address_sorting_list(lb_addrs, "output"); + } +} + +/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */ +void grpc_cares_wrapper_test_only_address_sorting_sort( + grpc_lb_addresses* lb_addrs) { + grpc_cares_wrapper_address_sorting_sort(lb_addrs); +} + static void grpc_ares_request_unref(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { - /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */ + grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out); + if (lb_addrs != nullptr) { + grpc_cares_wrapper_address_sorting_sort(lb_addrs); + } GRPC_CLOSURE_SCHED(r->on_done, r->error); gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index bda9cd1729..2d84a038d6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -22,11 +22,12 @@ #include <grpc/support/port_platform.h> #include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +extern grpc_core::TraceFlag grpc_trace_cares_address_sorting; + typedef struct grpc_ares_request grpc_ares_request; /* Asynchronously resolve \a name. Use \a default_port if a port isn't @@ -65,5 +66,9 @@ grpc_error* grpc_ares_init(void); it has been called the same number of times as grpc_ares_init(). */ void grpc_ares_cleanup(void); +/* Exposed only for testing */ +void grpc_cares_wrapper_test_only_address_sorting_sort( + grpc_lb_addresses* lb_addrs); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index fbab136421..e7842a7951 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -236,6 +236,15 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { } void NativeDnsResolver::MaybeStartResolvingLocked() { + // If there is an existing timer, the time it fires is the earliest time we + // can start the next resolution. + if (have_next_resolution_timer_) { + // TODO(dgq): remove the following two lines once Pick First stops + // discarding subchannels after selecting. + ++resolved_version_; + MaybeFinishNextLocked(); + return; + } if (last_resolution_timestamp_ >= 0) { const grpc_millis earliest_next_resolution = last_resolution_timestamp_ + min_time_between_resolutions_; @@ -248,17 +257,15 @@ void NativeDnsResolver::MaybeStartResolvingLocked() { "In cooldown from last resolution (from %" PRIdPTR " ms ago). Will resolve again in %" PRIdPTR " ms", last_resolution_ago, ms_until_next_resolution); - if (!have_next_resolution_timer_) { - have_next_resolution_timer_ = true; - // TODO(roth): We currently deal with this ref manually. Once the - // new closure API is done, find a way to track this ref with the timer - // callback as part of the type system. - RefCountedPtr<Resolver> self = - Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); - self.release(); - grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, - &on_next_resolution_); - } + have_next_resolution_timer_ = true; + // TODO(roth): We currently deal with this ref manually. Once the + // new closure API is done, find a way to track this ref with the timer + // callback as part of the type system. + RefCountedPtr<Resolver> self = + Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown"); + self.release(); + grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution, + &on_next_resolution_); // TODO(dgq): remove the following two lines once Pick First stops // discarding subchannels after selecting. ++resolved_version_; @@ -270,6 +277,7 @@ void NativeDnsResolver::MaybeStartResolvingLocked() { } void NativeDnsResolver::StartResolvingLocked() { + gpr_log(GPR_DEBUG, "Start resolving."); // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 4d8958f519..99a33f2277 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -82,6 +82,8 @@ class FakeResolver : public Resolver { grpc_closure* next_completion_ = nullptr; // target result address for next completion grpc_channel_args** target_result_ = nullptr; + // if true, return failure + bool return_failure_ = false; }; FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) { @@ -121,12 +123,16 @@ void FakeResolver::RequestReresolutionLocked() { } void FakeResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && next_results_ != nullptr) { - *target_result_ = grpc_channel_args_union(next_results_, channel_args_); + if (next_completion_ != nullptr && + (next_results_ != nullptr || return_failure_)) { + *target_result_ = + return_failure_ ? nullptr + : grpc_channel_args_union(next_results_, channel_args_); grpc_channel_args_destroy(next_results_); next_results_ = nullptr; GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; + return_failure_ = false; } } @@ -197,6 +203,26 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( GRPC_ERROR_NONE); } +void FakeResolverResponseGenerator::SetFailureLocked(void* arg, + grpc_error* error) { + SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); + FakeResolver* resolver = closure_arg->generator->resolver_; + resolver->return_failure_ = true; + resolver->MaybeFinishNextLocked(); + Delete(closure_arg); +} + +void FakeResolverResponseGenerator::SetFailure() { + GPR_ASSERT(resolver_ != nullptr); + SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); + closure_arg->generator = this; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, + closure_arg, + grpc_combiner_scheduler(resolver_->combiner())), + GRPC_ERROR_NONE); +} + namespace { static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 858f35851d..e5175f9b7b 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -56,6 +56,10 @@ class FakeResolverResponseGenerator // resolver will return the last value set via \a SetResponse(). void SetReresolutionResponse(grpc_channel_args* response); + // Tells the resolver to return a transient failure (signalled by + // returning a null result with no error). + void SetFailure(); + // Returns a channel arg containing \a generator. static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); @@ -68,6 +72,7 @@ class FakeResolverResponseGenerator static void SetResponseLocked(void* arg, grpc_error* error); static void SetReresolutionResponseLocked(void* arg, grpc_error* error); + static void SetFailureLocked(void* arg, grpc_error* error); FakeResolver* resolver_ = nullptr; // Do not own. }; diff --git a/src/core/ext/filters/client_channel/retry_throttle.cc b/src/core/ext/filters/client_channel/retry_throttle.cc index 45de6667c8..bdeb7e4cac 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.cc +++ b/src/core/ext/filters/client_channel/retry_throttle.cc @@ -30,184 +30,162 @@ #include "src/core/lib/avl/avl.h" +namespace grpc_core { +namespace internal { + // -// server_retry_throttle_data +// ServerRetryThrottleData // -struct grpc_server_retry_throttle_data { - gpr_refcount refs; - int max_milli_tokens; - int milli_token_ratio; - gpr_atm milli_tokens; - // A pointer to the replacement for this grpc_server_retry_throttle_data - // entry. If non-nullptr, then this entry is stale and must not be used. - // We hold a reference to the replacement. - gpr_atm replacement; -}; - -static void get_replacement_throttle_data_if_needed( - grpc_server_retry_throttle_data** throttle_data) { +ServerRetryThrottleData::ServerRetryThrottleData( + intptr_t max_milli_tokens, intptr_t milli_token_ratio, + ServerRetryThrottleData* old_throttle_data) + : max_milli_tokens_(max_milli_tokens), + milli_token_ratio_(milli_token_ratio) { + intptr_t initial_milli_tokens = max_milli_tokens; + // If there was a pre-existing entry for this server name, initialize + // the token count by scaling proportionately to the old data. This + // ensures that if we're already throttling retries on the old scale, + // we will start out doing the same thing on the new one. + if (old_throttle_data != nullptr) { + double token_fraction = + static_cast<intptr_t>( + gpr_atm_acq_load(&old_throttle_data->milli_tokens_)) / + static_cast<double>(old_throttle_data->max_milli_tokens_); + initial_milli_tokens = + static_cast<intptr_t>(token_fraction * max_milli_tokens); + } + gpr_atm_rel_store(&milli_tokens_, static_cast<gpr_atm>(initial_milli_tokens)); + // If there was a pre-existing entry, mark it as stale and give it a + // pointer to the new entry, which is its replacement. + if (old_throttle_data != nullptr) { + Ref().release(); // Ref held by pre-existing entry. + gpr_atm_rel_store(&old_throttle_data->replacement_, + reinterpret_cast<gpr_atm>(this)); + } +} + +ServerRetryThrottleData::~ServerRetryThrottleData() { + ServerRetryThrottleData* replacement = + reinterpret_cast<ServerRetryThrottleData*>( + gpr_atm_acq_load(&replacement_)); + if (replacement != nullptr) { + replacement->Unref(); + } +} + +void ServerRetryThrottleData::GetReplacementThrottleDataIfNeeded( + ServerRetryThrottleData** throttle_data) { while (true) { - grpc_server_retry_throttle_data* new_throttle_data = - (grpc_server_retry_throttle_data*)gpr_atm_acq_load( - &(*throttle_data)->replacement); + ServerRetryThrottleData* new_throttle_data = + reinterpret_cast<ServerRetryThrottleData*>( + gpr_atm_acq_load(&(*throttle_data)->replacement_)); if (new_throttle_data == nullptr) return; *throttle_data = new_throttle_data; } } -bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data* throttle_data) { - if (throttle_data == nullptr) return true; +bool ServerRetryThrottleData::RecordFailure() { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(&throttle_data); + ServerRetryThrottleData* throttle_data = this; + GetReplacementThrottleDataIfNeeded(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. - const int new_value = static_cast<int>(gpr_atm_no_barrier_clamped_add( - &throttle_data->milli_tokens, static_cast<gpr_atm>(-1000), - static_cast<gpr_atm>(0), - static_cast<gpr_atm>(throttle_data->max_milli_tokens))); + const intptr_t new_value = + static_cast<intptr_t>(gpr_atm_no_barrier_clamped_add( + &throttle_data->milli_tokens_, static_cast<gpr_atm>(-1000), + static_cast<gpr_atm>(0), + static_cast<gpr_atm>(throttle_data->max_milli_tokens_))); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). - return new_value > throttle_data->max_milli_tokens / 2; + return new_value > throttle_data->max_milli_tokens_ / 2; } -void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data* throttle_data) { - if (throttle_data == nullptr) return; +void ServerRetryThrottleData::RecordSuccess() { // First, check if we are stale and need to be replaced. - get_replacement_throttle_data_if_needed(&throttle_data); + ServerRetryThrottleData* throttle_data = this; + GetReplacementThrottleDataIfNeeded(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. gpr_atm_no_barrier_clamped_add( - &throttle_data->milli_tokens, - static_cast<gpr_atm>(throttle_data->milli_token_ratio), + &throttle_data->milli_tokens_, + static_cast<gpr_atm>(throttle_data->milli_token_ratio_), static_cast<gpr_atm>(0), - static_cast<gpr_atm>(throttle_data->max_milli_tokens)); -} - -grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( - grpc_server_retry_throttle_data* throttle_data) { - gpr_ref(&throttle_data->refs); - return throttle_data; -} - -void grpc_server_retry_throttle_data_unref( - grpc_server_retry_throttle_data* throttle_data) { - if (gpr_unref(&throttle_data->refs)) { - grpc_server_retry_throttle_data* replacement = - (grpc_server_retry_throttle_data*)gpr_atm_acq_load( - &throttle_data->replacement); - if (replacement != nullptr) { - grpc_server_retry_throttle_data_unref(replacement); - } - gpr_free(throttle_data); - } -} - -static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( - int max_milli_tokens, int milli_token_ratio, - grpc_server_retry_throttle_data* old_throttle_data) { - grpc_server_retry_throttle_data* throttle_data = - static_cast<grpc_server_retry_throttle_data*>( - gpr_malloc(sizeof(*throttle_data))); - memset(throttle_data, 0, sizeof(*throttle_data)); - gpr_ref_init(&throttle_data->refs, 1); - throttle_data->max_milli_tokens = max_milli_tokens; - throttle_data->milli_token_ratio = milli_token_ratio; - int initial_milli_tokens = max_milli_tokens; - // If there was a pre-existing entry for this server name, initialize - // the token count by scaling proportionately to the old data. This - // ensures that if we're already throttling retries on the old scale, - // we will start out doing the same thing on the new one. - if (old_throttle_data != nullptr) { - double token_fraction = - static_cast<int>(gpr_atm_acq_load(&old_throttle_data->milli_tokens)) / - static_cast<double>(old_throttle_data->max_milli_tokens); - initial_milli_tokens = static_cast<int>(token_fraction * max_milli_tokens); - } - gpr_atm_rel_store(&throttle_data->milli_tokens, - (gpr_atm)initial_milli_tokens); - // If there was a pre-existing entry, mark it as stale and give it a - // pointer to the new entry, which is its replacement. - if (old_throttle_data != nullptr) { - grpc_server_retry_throttle_data_ref(throttle_data); - gpr_atm_rel_store(&old_throttle_data->replacement, (gpr_atm)throttle_data); - } - return throttle_data; + static_cast<gpr_atm>(throttle_data->max_milli_tokens_)); } // // avl vtable for string -> server_retry_throttle_data map // -static void* copy_server_name(void* key, void* unused) { +namespace { + +void* copy_server_name(void* key, void* unused) { return gpr_strdup(static_cast<const char*>(key)); } -static long compare_server_name(void* key1, void* key2, void* unused) { +long compare_server_name(void* key1, void* key2, void* unused) { return strcmp(static_cast<const char*>(key1), static_cast<const char*>(key2)); } -static void destroy_server_retry_throttle_data(void* value, void* unused) { - grpc_server_retry_throttle_data* throttle_data = - static_cast<grpc_server_retry_throttle_data*>(value); - grpc_server_retry_throttle_data_unref(throttle_data); +void destroy_server_retry_throttle_data(void* value, void* unused) { + ServerRetryThrottleData* throttle_data = + static_cast<ServerRetryThrottleData*>(value); + throttle_data->Unref(); } -static void* copy_server_retry_throttle_data(void* value, void* unused) { - grpc_server_retry_throttle_data* throttle_data = - static_cast<grpc_server_retry_throttle_data*>(value); - return grpc_server_retry_throttle_data_ref(throttle_data); +void* copy_server_retry_throttle_data(void* value, void* unused) { + ServerRetryThrottleData* throttle_data = + static_cast<ServerRetryThrottleData*>(value); + return throttle_data->Ref().release(); } -static void destroy_server_name(void* key, void* unused) { gpr_free(key); } +void destroy_server_name(void* key, void* unused) { gpr_free(key); } -static const grpc_avl_vtable avl_vtable = { +const grpc_avl_vtable avl_vtable = { destroy_server_name, copy_server_name, compare_server_name, destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; +} // namespace + // -// server_retry_throttle_map +// ServerRetryThrottleMap // static gpr_mu g_mu; static grpc_avl g_avl; -void grpc_retry_throttle_map_init() { +void ServerRetryThrottleMap::Init() { gpr_mu_init(&g_mu); g_avl = grpc_avl_create(&avl_vtable); } -void grpc_retry_throttle_map_shutdown() { +void ServerRetryThrottleMap::Shutdown() { gpr_mu_destroy(&g_mu); grpc_avl_unref(g_avl, nullptr); } -grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( - const char* server_name, int max_milli_tokens, int milli_token_ratio) { +RefCountedPtr<ServerRetryThrottleData> ServerRetryThrottleMap::GetDataForServer( + const char* server_name, intptr_t max_milli_tokens, + intptr_t milli_token_ratio) { + RefCountedPtr<ServerRetryThrottleData> result; gpr_mu_lock(&g_mu); - grpc_server_retry_throttle_data* throttle_data = - static_cast<grpc_server_retry_throttle_data*>( + ServerRetryThrottleData* throttle_data = + static_cast<ServerRetryThrottleData*>( grpc_avl_get(g_avl, const_cast<char*>(server_name), nullptr)); - if (throttle_data == nullptr) { - // Entry not found. Create a new one. - throttle_data = grpc_server_retry_throttle_data_create( - max_milli_tokens, milli_token_ratio, nullptr); - g_avl = grpc_avl_add(g_avl, const_cast<char*>(server_name), throttle_data, - nullptr); + if (throttle_data == nullptr || + throttle_data->max_milli_tokens() != max_milli_tokens || + throttle_data->milli_token_ratio() != milli_token_ratio) { + // Entry not found, or found with old parameters. Create a new one. + result = MakeRefCounted<ServerRetryThrottleData>( + max_milli_tokens, milli_token_ratio, throttle_data); + g_avl = grpc_avl_add(g_avl, gpr_strdup(server_name), + result->Ref().release(), nullptr); } else { - if (throttle_data->max_milli_tokens != max_milli_tokens || - throttle_data->milli_token_ratio != milli_token_ratio) { - // Entry found but with old parameters. Create a new one based on - // the original one. - throttle_data = grpc_server_retry_throttle_data_create( - max_milli_tokens, milli_token_ratio, throttle_data); - g_avl = grpc_avl_add(g_avl, const_cast<char*>(server_name), throttle_data, - nullptr); - } else { - // Entry found. Increase refcount. - grpc_server_retry_throttle_data_ref(throttle_data); - } + // Entry found. Return a new ref to it. + result = throttle_data->Ref(); } gpr_mu_unlock(&g_mu); - return throttle_data; + return result; } + +} // namespace internal +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/retry_throttle.h b/src/core/ext/filters/client_channel/retry_throttle.h index 0505fc27f2..fddafcd903 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.h +++ b/src/core/ext/filters/client_channel/retry_throttle.h @@ -21,32 +21,61 @@ #include <grpc/support/port_platform.h> -#include <stdbool.h> +#include "src/core/lib/gprpp/ref_counted.h" + +namespace grpc_core { +namespace internal { /// Tracks retry throttling data for an individual server name. -typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data; - -/// Records a failure. Returns true if it's okay to send a retry. -bool grpc_server_retry_throttle_data_record_failure( - grpc_server_retry_throttle_data* throttle_data); -/// Records a success. -void grpc_server_retry_throttle_data_record_success( - grpc_server_retry_throttle_data* throttle_data); - -grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref( - grpc_server_retry_throttle_data* throttle_data); -void grpc_server_retry_throttle_data_unref( - grpc_server_retry_throttle_data* throttle_data); - -/// Initializes global map of failure data for each server name. -void grpc_retry_throttle_map_init(); -/// Shuts down global map of failure data for each server name. -void grpc_retry_throttle_map_shutdown(); - -/// Returns a reference to the failure data for \a server_name, creating -/// a new entry if needed. -/// Caller must eventually unref via \a grpc_server_retry_throttle_data_unref(). -grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( - const char* server_name, int max_milli_tokens, int milli_token_ratio); +class ServerRetryThrottleData : public RefCounted<ServerRetryThrottleData> { + public: + ServerRetryThrottleData(intptr_t max_milli_tokens, intptr_t milli_token_ratio, + ServerRetryThrottleData* old_throttle_data); + + /// Records a failure. Returns true if it's okay to send a retry. + bool RecordFailure(); + + /// Records a success. + void RecordSuccess(); + + intptr_t max_milli_tokens() const { return max_milli_tokens_; } + intptr_t milli_token_ratio() const { return milli_token_ratio_; } + + private: + // So Delete() can call our private dtor. + template <typename T> + friend void grpc_core::Delete(T*); + + ~ServerRetryThrottleData(); + + void GetReplacementThrottleDataIfNeeded( + ServerRetryThrottleData** throttle_data); + + const intptr_t max_milli_tokens_; + const intptr_t milli_token_ratio_; + gpr_atm milli_tokens_; + // A pointer to the replacement for this ServerRetryThrottleData entry. + // If non-nullptr, then this entry is stale and must not be used. + // We hold a reference to the replacement. + gpr_atm replacement_ = 0; +}; + +/// Global map of server name to retry throttle data. +class ServerRetryThrottleMap { + public: + /// Initializes global map of failure data for each server name. + static void Init(); + /// Shuts down global map of failure data for each server name. + static void Shutdown(); + + /// Returns the failure data for \a server_name, creating a new entry if + /// needed. + static RefCountedPtr<ServerRetryThrottleData> GetDataForServer( + const char* server_name, intptr_t max_milli_tokens, + intptr_t milli_token_ratio); +}; + +} // namespace internal +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_THROTTLE_H */ diff --git a/src/core/ext/filters/client_channel/status_util.cc b/src/core/ext/filters/client_channel/status_util.cc deleted file mode 100644 index 11f732ab44..0000000000 --- a/src/core/ext/filters/client_channel/status_util.cc +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include "src/core/ext/filters/client_channel/status_util.h" - -#include "src/core/lib/gpr/useful.h" - -typedef struct { - const char* str; - grpc_status_code status; -} status_string_entry; - -static const status_string_entry g_status_string_entries[] = { - {"OK", GRPC_STATUS_OK}, - {"CANCELLED", GRPC_STATUS_CANCELLED}, - {"UNKNOWN", GRPC_STATUS_UNKNOWN}, - {"INVALID_ARGUMENT", GRPC_STATUS_INVALID_ARGUMENT}, - {"DEADLINE_EXCEEDED", GRPC_STATUS_DEADLINE_EXCEEDED}, - {"NOT_FOUND", GRPC_STATUS_NOT_FOUND}, - {"ALREADY_EXISTS", GRPC_STATUS_ALREADY_EXISTS}, - {"PERMISSION_DENIED", GRPC_STATUS_PERMISSION_DENIED}, - {"UNAUTHENTICATED", GRPC_STATUS_UNAUTHENTICATED}, - {"RESOURCE_EXHAUSTED", GRPC_STATUS_RESOURCE_EXHAUSTED}, - {"FAILED_PRECONDITION", GRPC_STATUS_FAILED_PRECONDITION}, - {"ABORTED", GRPC_STATUS_ABORTED}, - {"OUT_OF_RANGE", GRPC_STATUS_OUT_OF_RANGE}, - {"UNIMPLEMENTED", GRPC_STATUS_UNIMPLEMENTED}, - {"INTERNAL", GRPC_STATUS_INTERNAL}, - {"UNAVAILABLE", GRPC_STATUS_UNAVAILABLE}, - {"DATA_LOSS", GRPC_STATUS_DATA_LOSS}, -}; - -bool grpc_status_code_from_string(const char* status_str, - grpc_status_code* status) { - for (size_t i = 0; i < GPR_ARRAY_SIZE(g_status_string_entries); ++i) { - if (strcmp(status_str, g_status_string_entries[i].str) == 0) { - *status = g_status_string_entries[i].status; - return true; - } - } - return false; -} - -const char* grpc_status_code_to_string(grpc_status_code status) { - switch (status) { - case GRPC_STATUS_OK: - return "OK"; - case GRPC_STATUS_CANCELLED: - return "CANCELLED"; - case GRPC_STATUS_UNKNOWN: - return "UNKNOWN"; - case GRPC_STATUS_INVALID_ARGUMENT: - return "INVALID_ARGUMENT"; - case GRPC_STATUS_DEADLINE_EXCEEDED: - return "DEADLINE_EXCEEDED"; - case GRPC_STATUS_NOT_FOUND: - return "NOT_FOUND"; - case GRPC_STATUS_ALREADY_EXISTS: - return "ALREADY_EXISTS"; - case GRPC_STATUS_PERMISSION_DENIED: - return "PERMISSION_DENIED"; - case GRPC_STATUS_UNAUTHENTICATED: - return "UNAUTHENTICATED"; - case GRPC_STATUS_RESOURCE_EXHAUSTED: - return "RESOURCE_EXHAUSTED"; - case GRPC_STATUS_FAILED_PRECONDITION: - return "FAILED_PRECONDITION"; - case GRPC_STATUS_ABORTED: - return "ABORTED"; - case GRPC_STATUS_OUT_OF_RANGE: - return "OUT_OF_RANGE"; - case GRPC_STATUS_UNIMPLEMENTED: - return "UNIMPLEMENTED"; - case GRPC_STATUS_INTERNAL: - return "INTERNAL"; - case GRPC_STATUS_UNAVAILABLE: - return "UNAVAILABLE"; - case GRPC_STATUS_DATA_LOSS: - return "DATA_LOSS"; - default: - return "UNKNOWN"; - } -} diff --git a/src/core/ext/filters/client_channel/status_util.h b/src/core/ext/filters/client_channel/status_util.h deleted file mode 100644 index e018709730..0000000000 --- a/src/core/ext/filters/client_channel/status_util.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H - -#include <grpc/support/port_platform.h> - -#include <grpc/status.h> - -#include <stdbool.h> -#include <string.h> - -/// If \a status_str is a valid status string, sets \a status to the -/// corresponding status value and returns true. -bool grpc_status_code_from_string(const char* status_str, - grpc_status_code* status); - -/// Returns the string form of \a status, or "UNKNOWN" if invalid. -const char* grpc_status_code_to_string(grpc_status_code status); - -namespace grpc_core { -namespace internal { - -/// A set of grpc_status_code values. -class StatusCodeSet { - public: - bool Empty() const { return status_code_mask_ == 0; } - - void Add(grpc_status_code status) { status_code_mask_ |= (1 << status); } - - bool Contains(grpc_status_code status) const { - return status_code_mask_ & (1 << status); - } - - private: - int status_code_mask_ = 0; // A bitfield of status codes in the set. -}; - -} // namespace internal -} // namespace grpc_core - -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index cae7cc35e3..d7815fb7e1 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -40,6 +40,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h index 1966da932b..d749f23308 100644 --- a/src/core/ext/filters/client_channel/uri_parser.h +++ b/src/core/ext/filters/client_channel/uri_parser.h @@ -22,7 +22,6 @@ #include <grpc/support/port_platform.h> #include <stddef.h> -#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { char* scheme; |