diff options
author | 2018-03-01 11:48:29 -0800 | |
---|---|---|
committer | 2018-03-01 11:48:29 -0800 | |
commit | b357f2f548884e0f1766b5b1c3b12e1aa64642cc (patch) | |
tree | 84a5f405562ee887fe79fb36b8a7fe42773933c6 /src/core/ext | |
parent | 2fe87b09055cd256cdce038c4c70d92b955c991b (diff) | |
parent | ccd1d55807bdb13b661dcf1d651468b2d98ff5af (diff) |
Merge branch 'master' into 2phase_thd
Diffstat (limited to 'src/core/ext')
17 files changed, 2413 insertions, 452 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index e7d72d1fde..3e2faa57bc 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015 gRPC authors. + * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,13 +127,7 @@ static void run_poller(void* arg, grpc_error* error) { &p->run_poller_closure); } -void grpc_client_channel_start_backup_polling( - grpc_pollset_set* interested_parties) { - gpr_once_init(&g_once, init_globals); - if (g_poll_interval_ms == 0) { - return; - } - gpr_mu_lock(&g_poller_mu); +static void g_poller_init_locked() { if (g_poller == nullptr) { g_poller = static_cast<backup_poller*>(gpr_zalloc(sizeof(backup_poller))); g_poller->pollset = @@ -149,7 +143,16 @@ void grpc_client_channel_start_backup_polling( grpc_core::ExecCtx::Get()->Now() + g_poll_interval_ms, &g_poller->run_poller_closure); } +} +void grpc_client_channel_start_backup_polling( + grpc_pollset_set* interested_parties) { + gpr_once_init(&g_once, init_globals); + if (g_poll_interval_ms == 0) { + return; + } + gpr_mu_lock(&g_poller_mu); + g_poller_init_locked(); gpr_ref(&g_poller->refs); /* Get a reference to g_poller->pollset before releasing g_poller_mu to make * TSAN happy. Otherwise, reading from g_poller (i.e g_poller->pollset) after diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h index 45bdf10d6c..7285b9b93e 100644 --- a/src/core/ext/filters/client_channel/backup_poller.h +++ b/src/core/ext/filters/client_channel/backup_poller.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015 gRPC authors. + * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 9a8f25b630..90b93fbe23 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -21,6 +21,7 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include <inttypes.h> +#include <limits.h> #include <stdbool.h> #include <stdio.h> #include <string.h> @@ -33,144 +34,65 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/method_params.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/status_util.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/status_metadata.h" + +using grpc_core::internal::ClientChannelMethodParams; /* Client channel implementation */ +// By default, we buffer 256 KiB per RPC for retries. +// TODO(roth): Do we have any data to suggest a better value? +#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) + +// This value was picked arbitrarily. It can be changed if there is +// any even moderately compelling reason to do so. +#define RETRY_BACKOFF_JITTER 0.2 + grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); /************************************************************************* - * METHOD-CONFIG TABLE + * CHANNEL-WIDE FUNCTIONS */ -typedef enum { - /* zero so it can be default initialized */ - WAIT_FOR_READY_UNSET = 0, - WAIT_FOR_READY_FALSE, - WAIT_FOR_READY_TRUE -} wait_for_ready_value; - -typedef struct { - gpr_refcount refs; - grpc_millis timeout; - wait_for_ready_value wait_for_ready; -} method_parameters; - -static method_parameters* method_parameters_ref( - method_parameters* method_params) { - gpr_ref(&method_params->refs); - return method_params; -} - -static void method_parameters_unref(method_parameters* method_params) { - if (gpr_unref(&method_params->refs)) { - gpr_free(method_params); - } -} - -// Wrappers to pass to grpc_service_config_create_method_config_table(). -static void* method_parameters_ref_wrapper(void* value) { - return method_parameters_ref(static_cast<method_parameters*>(value)); -} -static void method_parameters_unref_wrapper(void* value) { - method_parameters_unref(static_cast<method_parameters*>(value)); -} - -static bool parse_wait_for_ready(grpc_json* field, - wait_for_ready_value* wait_for_ready) { - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return false; - } - *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE - : WAIT_FOR_READY_FALSE; - return true; -} - -static bool parse_timeout(grpc_json* field, grpc_millis* timeout) { - if (field->type != GRPC_JSON_STRING) return false; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return false; - char* buf = gpr_strdup(field->value); - buf[len - 1] = '\0'; // Remove trailing 's'. - char* decimal_point = strchr(buf, '.'); - int nanos = 0; - if (decimal_point != nullptr) { - *decimal_point = '\0'; - nanos = gpr_parse_nonnegative_int(decimal_point + 1); - if (nanos == -1) { - gpr_free(buf); - return false; - } - int num_digits = static_cast<int>(strlen(decimal_point + 1)); - if (num_digits > 9) { // We don't accept greater precision than nanos. - gpr_free(buf); - return false; - } - for (int i = 0; i < (9 - num_digits); ++i) { - nanos *= 10; - } - } - int seconds = decimal_point == buf ? 0 : gpr_parse_nonnegative_int(buf); - gpr_free(buf); - if (seconds == -1) return false; - *timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; - return true; -} - -static void* method_parameters_create_from_json(const grpc_json* json) { - wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; - grpc_millis timeout = 0; - for (grpc_json* field = json->child; field != nullptr; field = field->next) { - if (field->key == nullptr) continue; - if (strcmp(field->key, "waitForReady") == 0) { - if (wait_for_ready != WAIT_FOR_READY_UNSET) return nullptr; // Duplicate. - if (!parse_wait_for_ready(field, &wait_for_ready)) return nullptr; - } else if (strcmp(field->key, "timeout") == 0) { - if (timeout > 0) return nullptr; // Duplicate. - if (!parse_timeout(field, &timeout)) return nullptr; - } - } - method_parameters* value = - static_cast<method_parameters*>(gpr_malloc(sizeof(method_parameters))); - gpr_ref_init(&value->refs, 1); - value->timeout = timeout; - value->wait_for_ready = wait_for_ready; - return value; -} - struct external_connectivity_watcher; -/************************************************************************* - * CHANNEL-WIDE FUNCTIONS - */ +typedef grpc_core::SliceHashTable< + grpc_core::RefCountedPtr<ClientChannelMethodParams>> + MethodParamsTable; typedef struct client_channel_channel_data { - /** resolver for this channel */ grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; - /** have we started resolving this channel */ bool started_resolving; - /** is deadline checking enabled? */ bool deadline_checking_enabled; - /** client channel factory */ grpc_client_channel_factory* client_channel_factory; + bool enable_retries; + size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; @@ -179,7 +101,7 @@ typedef struct client_channel_channel_data { /** retry throttle data */ grpc_server_retry_throttle_data* retry_throttle_data; /** maps method names to method_parameters structs */ - grpc_slice_hash_table* method_params_table; + grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ @@ -200,7 +122,7 @@ typedef struct client_channel_channel_data { gpr_mu external_connectivity_watcher_list_mu; struct external_connectivity_watcher* external_connectivity_watcher_list_head; - /* the following properties are guarded by a mutex since API's require them + /* the following properties are guarded by a mutex since APIs require them to be instantaneously available */ gpr_mu info_mu; char* info_lb_policy_name; @@ -306,9 +228,8 @@ typedef struct { grpc_server_retry_throttle_data* retry_throttle_data; } service_config_parsing_state; -static void parse_retry_throttle_params(const grpc_json* field, void* arg) { - service_config_parsing_state* parsing_state = - static_cast<service_config_parsing_state*>(arg); +static void parse_retry_throttle_params( + const grpc_json* field, service_config_parsing_state* parsing_state) { if (strcmp(field->key, "retryThrottling") == 0) { if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. if (field->type != GRPC_JSON_OBJECT) return; @@ -388,7 +309,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, grpc_error_string(error)); } - // Extract the following fields from the resolver result, if non-NULL. + // Extract the following fields from the resolver result, if non-nullptr. bool lb_policy_updated = false; bool lb_policy_created = false; char* lb_policy_name_dup = nullptr; @@ -396,7 +317,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy; char* service_config_json = nullptr; grpc_server_retry_throttle_data* retry_throttle_data = nullptr; - grpc_slice_hash_table* method_params_table = nullptr; + grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; if (chand->resolver_result != nullptr) { if (chand->resolver != nullptr) { // Find LB policy name. @@ -431,7 +352,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { // Use pick_first if nothing was specified and we didn't select grpclb // above. if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - // Check to see if we're already using the right LB policy. // Note: It's safe to use chand->info_lb_policy_name here without // taking a lock on chand->info_mu, because this function is the @@ -469,39 +389,39 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { new_lb_policy->SetReresolutionClosureLocked(&args->closure); } } + // Before we clean up, save a copy of lb_policy_name, since it might + // be pointing to data inside chand->resolver_result. + // The copy will be saved in chand->lb_policy_name below. + lb_policy_name_dup = gpr_strdup(lb_policy_name); // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); service_config_json = gpr_strdup(grpc_channel_arg_get_string(channel_arg)); if (service_config_json != nullptr) { - grpc_service_config* service_config = - grpc_service_config_create(service_config_json); + grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = + grpc_core::ServiceConfig::Create(service_config_json); if (service_config != nullptr) { - channel_arg = grpc_channel_args_find(chand->resolver_result, - GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(channel_arg); - GPR_ASSERT(server_uri != nullptr); - grpc_uri* uri = grpc_uri_parse(server_uri, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - grpc_service_config_parse_global_params( - service_config, parse_retry_throttle_params, &parsing_state); - grpc_uri_destroy(uri); - retry_throttle_data = parsing_state.retry_throttle_data; - method_params_table = grpc_service_config_create_method_config_table( - service_config, method_parameters_create_from_json, - method_parameters_ref_wrapper, method_parameters_unref_wrapper); - grpc_service_config_destroy(service_config); + if (chand->enable_retries) { + channel_arg = grpc_channel_args_find(chand->resolver_result, + GRPC_ARG_SERVER_URI); + const char* server_uri = grpc_channel_arg_get_string(channel_arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); + GPR_ASSERT(uri->path[0] != '\0'); + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + service_config->ParseGlobalParams(parse_retry_throttle_params, + &parsing_state); + grpc_uri_destroy(uri); + retry_throttle_data = parsing_state.retry_throttle_data; + } + method_params_table = service_config->CreateMethodConfigTable( + ClientChannelMethodParams::CreateFromJson); } } - // Before we clean up, save a copy of lb_policy_name, since it might - // be pointing to data inside chand->resolver_result. - // The copy will be saved in chand->lb_policy_name below. - lb_policy_name_dup = gpr_strdup(lb_policy_name); } grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; @@ -514,7 +434,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { lb_policy_name_changed ? " (changed)" : "", service_config_json); } // Now swap out fields in chand. Note that the new values may still - // be NULL if (e.g.) the resolver failed to return results or the + // be nullptr if (e.g.) the resolver failed to return results or the // results did not contain the necessary data. // // First, swap out the data used by cc_get_channel_info(). @@ -534,16 +454,13 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } chand->retry_throttle_data = retry_throttle_data; // Swap out the method params table. - if (chand->method_params_table != nullptr) { - grpc_slice_hash_table_unref(chand->method_params_table); - } - chand->method_params_table = method_params_table; + chand->method_params_table = std::move(method_params_table); // If we have a new LB policy or are shutting down (in which case - // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one - // and removing its fds from chand->interested_parties. Note that we do NOT do - // this if either (a) we updated the existing LB policy above or (b) we failed - // to create the new LB policy (in which case we want to continue using the - // most recent one we had). + // new_lb_policy will be nullptr), swap out the LB policy, unreffing the + // old one and removing its fds from chand->interested_parties. + // Note that we do NOT do this if either (a) we updated the existing + // LB policy above or (b) we failed to create the new LB policy (in + // which case we want to continue using the most recent one we had). if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { if (chand->lb_policy != nullptr) { @@ -722,9 +639,17 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); + // Record max per-RPC retry buffer size. + const grpc_arg* arg = grpc_channel_args_find( + args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); + chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( + arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); + // Record enable_retries. + arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); + chand->enable_retries = grpc_channel_arg_get_bool(arg, true); // Record client channel factory. - const grpc_arg* arg = grpc_channel_args_find(args->channel_args, - GRPC_ARG_CLIENT_CHANNEL_FACTORY); + arg = grpc_channel_args_find(args->channel_args, + GRPC_ARG_CLIENT_CHANNEL_FACTORY); if (arg == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Missing client channel factory in args for client channel filter"); @@ -794,7 +719,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); } if (chand->method_params_table != nullptr) { - grpc_slice_hash_table_unref(chand->method_params_table); + chand->method_params_table.reset(); } grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_connectivity_state_destroy(&chand->state_tracker); @@ -809,15 +734,122 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { */ // Max number of batches that can be pending on a call at any given -// time. This includes: +// time. This includes one batch for each of the following ops: // recv_initial_metadata // send_initial_metadata // recv_message // send_message // recv_trailing_metadata // send_trailing_metadata -// We also add room for a single cancel_stream batch. -#define MAX_WAITING_BATCHES 7 +#define MAX_PENDING_BATCHES 6 + +// Retry support: +// +// In order to support retries, we act as a proxy for stream op batches. +// When we get a batch from the surface, we add it to our list of pending +// batches, and we then use those batches to construct separate "child" +// batches to be started on the subchannel call. When the child batches +// return, we then decide which pending batches have been completed and +// schedule their callbacks accordingly. If a subchannel call fails and +// we want to retry it, we do a new pick and start again, constructing +// new "child" batches for the new subchannel call. +// +// Note that retries are committed when receiving data from the server +// (except for Trailers-Only responses). However, there may be many +// send ops started before receiving any data, so we may have already +// completed some number of send ops (and returned the completions up to +// the surface) by the time we realize that we need to retry. To deal +// with this, we cache data for send ops, so that we can replay them on a +// different subchannel call even after we have completed the original +// batches. +// +// There are two sets of data to maintain: +// - In call_data (in the parent channel), we maintain a list of pending +// ops and cached data for send ops. +// - In the subchannel call, we maintain state to indicate what ops have +// already been sent down to that call. +// +// When constructing the "child" batches, we compare those two sets of +// data to see which batches need to be sent to the subchannel call. + +// TODO(roth): In subsequent PRs: +// - add support for transparent retries (including initial metadata) +// - figure out how to record stats in census for retries +// (census filter is on top of this one) +// - add census stats for retries + +// State used for starting a retryable batch on a subchannel call. +// This provides its own grpc_transport_stream_op_batch and other data +// structures needed to populate the ops in the batch. +// We allocate one struct on the arena for each attempt at starting a +// batch on a given subchannel call. +typedef struct { + gpr_refcount refs; + grpc_call_element* elem; + grpc_subchannel_call* subchannel_call; // Holds a ref. + // The batch to use in the subchannel call. + // Its payload field points to subchannel_call_retry_state.batch_payload. + grpc_transport_stream_op_batch batch; + // For send_initial_metadata. + // Note that we need to make a copy of the initial metadata for each + // subchannel call instead of just referring to the copy in call_data, + // because filters in the subchannel stack will probably add entries, + // so we need to start in a pristine state for each attempt of the call. + grpc_linked_mdelem* send_initial_metadata_storage; + grpc_metadata_batch send_initial_metadata; + // For send_message. + grpc_caching_byte_stream send_message; + // For send_trailing_metadata. + grpc_linked_mdelem* send_trailing_metadata_storage; + grpc_metadata_batch send_trailing_metadata; + // For intercepting recv_initial_metadata. + grpc_metadata_batch recv_initial_metadata; + grpc_closure recv_initial_metadata_ready; + bool trailing_metadata_available; + // For intercepting recv_message. + grpc_closure recv_message_ready; + grpc_byte_stream* recv_message; + // For intercepting recv_trailing_metadata. + grpc_metadata_batch recv_trailing_metadata; + grpc_transport_stream_stats collect_stats; + // For intercepting on_complete. + grpc_closure on_complete; +} subchannel_batch_data; + +// Retry state associated with a subchannel call. +// Stored in the parent_data of the subchannel call object. +typedef struct { + // subchannel_batch_data.batch.payload points to this. + grpc_transport_stream_op_batch_payload batch_payload; + // These fields indicate which ops have been started and completed on + // this subchannel call. + size_t started_send_message_count; + size_t completed_send_message_count; + size_t started_recv_message_count; + size_t completed_recv_message_count; + bool started_send_initial_metadata : 1; + bool completed_send_initial_metadata : 1; + bool started_send_trailing_metadata : 1; + bool completed_send_trailing_metadata : 1; + bool started_recv_initial_metadata : 1; + bool completed_recv_initial_metadata : 1; + bool started_recv_trailing_metadata : 1; + bool completed_recv_trailing_metadata : 1; + // State for callback processing. + bool retry_dispatched : 1; + bool recv_initial_metadata_ready_deferred : 1; + bool recv_message_ready_deferred : 1; + grpc_error* recv_initial_metadata_error; + grpc_error* recv_message_error; +} subchannel_call_retry_state; + +// Pending batches stored in call data. +typedef struct { + // The pending batch. If nullptr, this slot is empty. + grpc_transport_stream_op_batch* batch; + // Indicates whether payload for send ops has been cached in call data. + bool send_ops_cached; +} pending_batch; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. @@ -841,159 +873,1592 @@ typedef struct client_channel_call_data { grpc_call_combiner* call_combiner; grpc_server_retry_throttle_data* retry_throttle_data; - method_parameters* method_params; + grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; grpc_subchannel_call* subchannel_call; - grpc_error* error; + + // Set when we get a cancel_stream op. + grpc_error* cancel_error; grpc_core::LoadBalancingPolicy::PickState pick; - grpc_closure lb_pick_closure; - grpc_closure lb_pick_cancel_closure; + grpc_closure pick_closure; + grpc_closure pick_cancel_closure; grpc_polling_entity* pollent; - grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; - size_t waiting_for_pick_batches_count; - grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES]; + // Batches are added to this list when received from above. + // They are removed when we are done handling the batch (i.e., when + // either we have invoked all of the batch's callbacks or we have + // passed the batch down to the subchannel call and are not + // intercepting any of its callbacks). + pending_batch pending_batches[MAX_PENDING_BATCHES]; + bool pending_send_initial_metadata : 1; + bool pending_send_message : 1; + bool pending_send_trailing_metadata : 1; + + // Retry state. + bool enable_retries : 1; + bool retry_committed : 1; + bool last_attempt_got_server_pushback : 1; + int num_attempts_completed; + size_t bytes_buffered_for_retry; + grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; + grpc_timer retry_timer; + + // Cached data for retrying send ops. + // send_initial_metadata + bool seen_send_initial_metadata; + grpc_linked_mdelem* send_initial_metadata_storage; + grpc_metadata_batch send_initial_metadata; + uint32_t send_initial_metadata_flags; + gpr_atm* peer_string; + // send_message + // When we get a send_message op, we replace the original byte stream + // with a grpc_caching_byte_stream that caches the slices to a + // local buffer for use in retries. + // Note: We inline the cache for the first 3 send_message ops and use + // dynamic allocation after that. This number was essentially picked + // at random; it could be changed in the future to tune performance. + grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages; + // send_trailing_metadata + bool seen_send_trailing_metadata; + grpc_linked_mdelem* send_trailing_metadata_storage; + grpc_metadata_batch send_trailing_metadata; +} call_data; - grpc_transport_stream_op_batch* initial_metadata_batch; +// Forward declarations. +static void retry_commit(grpc_call_element* elem, + subchannel_call_retry_state* retry_state); +static void start_internal_recv_trailing_metadata(grpc_call_element* elem); +static void on_complete(void* arg, grpc_error* error); +static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); +static void pick_after_resolver_result_start_locked(grpc_call_element* elem); +static void start_pick_locked(void* arg, grpc_error* ignored); + +// +// send op data caching +// + +// Caches data for send ops so that it can be retried later, if not +// already cached. +static void maybe_cache_send_ops_for_batch(call_data* calld, + pending_batch* pending) { + if (pending->send_ops_cached) return; + pending->send_ops_cached = true; + grpc_transport_stream_op_batch* batch = pending->batch; + // Save a copy of metadata for send_initial_metadata ops. + if (batch->send_initial_metadata) { + calld->seen_send_initial_metadata = true; + GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); + grpc_metadata_batch* send_initial_metadata = + batch->payload->send_initial_metadata.send_initial_metadata; + calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( + calld->arena, + sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); + grpc_metadata_batch_copy(send_initial_metadata, + &calld->send_initial_metadata, + calld->send_initial_metadata_storage); + calld->send_initial_metadata_flags = + batch->payload->send_initial_metadata.send_initial_metadata_flags; + calld->peer_string = batch->payload->send_initial_metadata.peer_string; + } + // Set up cache for send_message ops. + if (batch->send_message) { + grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc( + calld->arena, sizeof(grpc_byte_stream_cache)); + grpc_byte_stream_cache_init(cache, + batch->payload->send_message.send_message); + calld->send_messages.push_back(cache); + } + // Save metadata batch for send_trailing_metadata ops. + if (batch->send_trailing_metadata) { + calld->seen_send_trailing_metadata = true; + GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); + grpc_metadata_batch* send_trailing_metadata = + batch->payload->send_trailing_metadata.send_trailing_metadata; + calld->send_trailing_metadata_storage = + (grpc_linked_mdelem*)gpr_arena_alloc( + calld->arena, + sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); + grpc_metadata_batch_copy(send_trailing_metadata, + &calld->send_trailing_metadata, + calld->send_trailing_metadata_storage); + } +} - grpc_closure on_complete; - grpc_closure* original_on_complete; -} call_data; +// Frees cached send ops that have already been completed after +// committing the call. +static void free_cached_send_op_data_after_commit( + grpc_call_element* elem, subchannel_call_retry_state* retry_state) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (retry_state->completed_send_initial_metadata) { + grpc_metadata_batch_destroy(&calld->send_initial_metadata); + } + for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR + "]", + chand, calld, i); + } + grpc_byte_stream_cache_destroy(calld->send_messages[i]); + } + if (retry_state->completed_send_trailing_metadata) { + grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + } +} -grpc_subchannel_call* grpc_client_channel_get_subchannel_call( - grpc_call_element* elem) { +// Frees cached send ops that were completed by the completed batch in +// batch_data. Used when batches are completed after the call is committed. +static void free_cached_send_op_data_for_completed_batch( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); - return calld->subchannel_call; + if (batch_data->batch.send_initial_metadata) { + grpc_metadata_batch_destroy(&calld->send_initial_metadata); + } + if (batch_data->batch.send_message) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR + "]", + chand, calld, retry_state->completed_send_message_count - 1); + } + grpc_byte_stream_cache_destroy( + calld->send_messages[retry_state->completed_send_message_count - 1]); + } + if (batch_data->batch.send_trailing_metadata) { + grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + } +} + +// +// pending_batches management +// + +// Returns the index into calld->pending_batches to be used for batch. +static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { + // Note: It is important the send_initial_metadata be the first entry + // here, since the code in pick_subchannel_locked() assumes it will be. + if (batch->send_initial_metadata) return 0; + if (batch->send_message) return 1; + if (batch->send_trailing_metadata) return 2; + if (batch->recv_initial_metadata) return 3; + if (batch->recv_message) return 4; + if (batch->recv_trailing_metadata) return 5; + GPR_UNREACHABLE_CODE(return (size_t)-1); } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_add( - call_data* calld, grpc_transport_stream_op_batch* batch) { - if (batch->send_initial_metadata) { - GPR_ASSERT(calld->initial_metadata_batch == nullptr); - calld->initial_metadata_batch = batch; - } else { - GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES); - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] = - batch; +static void pending_batches_add(grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + const size_t idx = get_batch_index(batch); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, + calld, idx); + } + pending_batch* pending = &calld->pending_batches[idx]; + GPR_ASSERT(pending->batch == nullptr); + pending->batch = batch; + pending->send_ops_cached = false; + if (calld->enable_retries) { + // Update state in calld about pending batches. + // Also check if the batch takes us over the retry buffer limit. + // Note: We don't check the size of trailing metadata here, because + // gRPC clients do not send trailing metadata. + if (batch->send_initial_metadata) { + calld->pending_send_initial_metadata = true; + calld->bytes_buffered_for_retry += grpc_metadata_batch_size( + batch->payload->send_initial_metadata.send_initial_metadata); + } + if (batch->send_message) { + calld->pending_send_message = true; + calld->bytes_buffered_for_retry += + batch->payload->send_message.send_message->length; + } + if (batch->send_trailing_metadata) { + calld->pending_send_trailing_metadata = true; + } + if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: exceeded retry buffer size, committing", + chand, calld); + } + subchannel_call_retry_state* retry_state = + calld->subchannel_call == nullptr + ? nullptr + : static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); + retry_commit(elem, retry_state); + // If we are not going to retry and have not yet started, pretend + // retries are disabled so that we don't bother with retry overhead. + if (calld->num_attempts_completed == 0) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: disabling retries before first attempt", + chand, calld); + } + calld->enable_retries = false; + } + } + } +} + +static void pending_batch_clear(call_data* calld, pending_batch* pending) { + if (calld->enable_retries) { + if (pending->batch->send_initial_metadata) { + calld->pending_send_initial_metadata = false; + } + if (pending->batch->send_message) { + calld->pending_send_message = false; + } + if (pending->batch->send_trailing_metadata) { + calld->pending_send_trailing_metadata = false; + } } + pending->batch = nullptr; } // This is called via the call combiner, so access to calld is synchronized. static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { - call_data* calld = static_cast<call_data*>(arg); - if (calld->waiting_for_pick_batches_count > 0) { - --calld->waiting_for_pick_batches_count; - grpc_transport_stream_op_batch_finish_with_failure( - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count], - GRPC_ERROR_REF(error), calld->call_combiner); - } + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(error), calld->call_combiner); } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_fail(grpc_call_element* elem, - grpc_error* error) { +// If yield_call_combiner is true, assumes responsibility for yielding +// the call combiner. +static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, + bool yield_call_combiner) { + GPR_ASSERT(error != GRPC_ERROR_NONE); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + if (calld->pending_batches[i].batch != nullptr) ++num_batches; + } gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", - elem->channel_data, calld, calld->waiting_for_pick_batches_count, - grpc_error_string(error)); + elem->channel_data, calld, num_batches, grpc_error_string(error)); + } + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr) { + batches[num_batches++] = batch; + pending_batch_clear(calld, pending); + } } - for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { - GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], - fail_pending_batch_in_call_combiner, calld, + for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + fail_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START( - calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_REF(error), "waiting_for_pick_batches_fail"); - } - if (calld->initial_metadata_batch != nullptr) { - grpc_transport_stream_op_batch_finish_with_failure( - calld->initial_metadata_batch, GRPC_ERROR_REF(error), - calld->call_combiner); - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "waiting_for_pick_batches_fail"); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, + GRPC_ERROR_REF(error), "pending_batches_fail"); + } + if (yield_call_combiner) { + if (num_batches > 0) { + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batches[0], GRPC_ERROR_REF(error), calld->call_combiner); + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail"); + } } GRPC_ERROR_UNREF(error); } // This is called via the call combiner, so access to calld is synchronized. -static void run_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { - call_data* calld = static_cast<call_data*>(arg); - if (calld->waiting_for_pick_batches_count > 0) { - --calld->waiting_for_pick_batches_count; - grpc_subchannel_call_process_op( - calld->subchannel_call, - calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]); - } +static void resume_pending_batch_in_call_combiner(void* arg, + grpc_error* ignored) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + grpc_subchannel_call* subchannel_call = + static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(subchannel_call, batch); } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_resume(grpc_call_element* elem) { +static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->enable_retries) { + start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); + return; + } + // Retries not enabled; send down batches as-is. if (grpc_client_channel_trace.enabled()) { + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + if (calld->pending_batches[i].batch != nullptr) ++num_batches; + } gpr_log(GPR_DEBUG, - "chand=%p calld=%p: sending %" PRIuPTR - " pending batches to subchannel_call=%p", - chand, calld, calld->waiting_for_pick_batches_count, - calld->subchannel_call); - } - for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { - GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i], - run_pending_batch_in_call_combiner, calld, + "chand=%p calld=%p: starting %" PRIuPTR + " pending batches on subchannel_call=%p", + chand, calld, num_batches, calld->subchannel_call); + } + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch != nullptr) { + batches[num_batches++] = batch; + pending_batch_clear(calld, pending); + } + } + for (size_t i = 1; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START( - calld->call_combiner, &calld->handle_pending_batch_in_call_combiner[i], - GRPC_ERROR_NONE, "waiting_for_pick_batches_resume"); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, GRPC_ERROR_NONE, + "pending_batches_resume"); } - GPR_ASSERT(calld->initial_metadata_batch != nullptr); - grpc_subchannel_call_process_op(calld->subchannel_call, - calld->initial_metadata_batch); + GPR_ASSERT(num_batches > 0); + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); } -// Applies service config to the call. Must be invoked once we know -// that the resolver has returned results to the channel. -static void apply_service_config_to_call_locked(grpc_call_element* elem) { +static void maybe_clear_pending_batch(grpc_call_element* elem, + pending_batch* pending) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); + grpc_transport_stream_op_batch* batch = pending->batch; + // We clear the pending batch if all of its callbacks have been + // scheduled and reset to nullptr. + if (batch->on_complete == nullptr && + (!batch->recv_initial_metadata || + batch->payload->recv_initial_metadata.recv_initial_metadata_ready == + nullptr) && + (!batch->recv_message || + batch->payload->recv_message.recv_message_ready == nullptr)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: clearing pending batch", chand, + calld); + } + pending_batch_clear(calld, pending); + } +} + +// Returns true if all ops in the pending batch have been completed. +static bool pending_batch_is_completed( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->completed_send_initial_metadata) { + return false; + } + if (pending->batch->send_message && + retry_state->completed_send_message_count < calld->send_messages.size()) { + return false; + } + if (pending->batch->send_trailing_metadata && + !retry_state->completed_send_trailing_metadata) { + return false; + } + if (pending->batch->recv_initial_metadata && + !retry_state->completed_recv_initial_metadata) { + return false; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count < + retry_state->started_recv_message_count) { + return false; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->completed_recv_trailing_metadata) { + return false; + } + return true; +} + +// Returns true if any op in the batch was not yet started. +static bool pending_batch_is_unstarted( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->started_send_initial_metadata) { + return true; + } + if (pending->batch->send_message && + retry_state->started_send_message_count < calld->send_messages.size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !retry_state->started_send_trailing_metadata) { + return true; + } + if (pending->batch->recv_initial_metadata && + !retry_state->started_recv_initial_metadata) { + return true; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count == + retry_state->started_recv_message_count) { + return true; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->started_recv_trailing_metadata) { + return true; + } + return false; +} + +// +// retry code +// + +// Commits the call so that no further retry attempts will be performed. +static void retry_commit(grpc_call_element* elem, + subchannel_call_retry_state* retry_state) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->retry_committed) return; + calld->retry_committed = true; if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + gpr_log(GPR_DEBUG, "chand=%p calld=%p: committing retries", chand, calld); + } + if (retry_state != nullptr) { + free_cached_send_op_data_after_commit(elem, retry_state); + } +} + +// Starts a retry after appropriate back-off. +static void do_retry(grpc_call_element* elem, + subchannel_call_retry_state* retry_state, + grpc_millis server_pushback_ms) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + GPR_ASSERT(calld->method_params != nullptr); + const ClientChannelMethodParams::RetryPolicy* retry_policy = + calld->method_params->retry_policy(); + GPR_ASSERT(retry_policy != nullptr); + // Reset subchannel call and connected subchannel. + if (calld->subchannel_call != nullptr) { + GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, + "client_channel_call_retry"); + calld->subchannel_call = nullptr; + } + if (calld->pick.connected_subchannel != nullptr) { + calld->pick.connected_subchannel.reset(); + } + // Compute backoff delay. + grpc_millis next_attempt_time; + if (server_pushback_ms >= 0) { + next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; + calld->last_attempt_got_server_pushback = true; + } else { + if (calld->num_attempts_completed == 1 || + calld->last_attempt_got_server_pushback) { + calld->retry_backoff.Init( + grpc_core::BackOff::Options() + .set_initial_backoff(retry_policy->initial_backoff) + .set_multiplier(retry_policy->backoff_multiplier) + .set_jitter(RETRY_BACKOFF_JITTER) + .set_max_backoff(retry_policy->max_backoff)); + calld->last_attempt_got_server_pushback = false; + } + next_attempt_time = calld->retry_backoff->NextAttemptTime(); + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: retrying failed call in %" PRIuPTR " ms", chand, + calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); + } + // Schedule retry after computed delay. + GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, + grpc_combiner_scheduler(chand->combiner)); + grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); + // Update bookkeeping. + if (retry_state != nullptr) retry_state->retry_dispatched = true; +} + +// Returns true if the call is being retried. +static bool maybe_retry(grpc_call_element* elem, + subchannel_batch_data* batch_data, + grpc_status_code status, + grpc_mdelem* server_pushback_md) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + // Get retry policy. + if (calld->method_params == nullptr) return false; + const ClientChannelMethodParams::RetryPolicy* retry_policy = + calld->method_params->retry_policy(); + if (retry_policy == nullptr) return false; + // If we've already dispatched a retry from this call, return true. + // This catches the case where the batch has multiple callbacks + // (i.e., it includes either recv_message or recv_initial_metadata). + subchannel_call_retry_state* retry_state = nullptr; + if (batch_data != nullptr) { + retry_state = static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + if (retry_state->retry_dispatched) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: retry already dispatched", chand, + calld); + } + return true; + } + } + // Check status. + if (status == GRPC_STATUS_OK) { + grpc_server_retry_throttle_data_record_success(calld->retry_throttle_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: call succeeded", chand, calld); + } + return false; + } + // Status is not OK. Check whether the status is retryable. + if (!retry_policy->retryable_status_codes.Contains(status)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: status %s not configured as retryable", chand, + calld, grpc_status_code_to_string(status)); + } + return false; + } + // Record the failure and check whether retries are throttled. + // Note that it's important for this check to come after the status + // code check above, since we should only record failures whose statuses + // match the configured retryable status codes, so that we don't count + // things like failures due to malformed requests (INVALID_ARGUMENT). + // Conversely, it's important for this to come before the remaining + // checks, so that we don't fail to record failures due to other factors. + if (!grpc_server_retry_throttle_data_record_failure( + calld->retry_throttle_data)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries throttled", chand, calld); + } + return false; + } + // Check whether the call is committed. + if (calld->retry_committed) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: retries already committed", chand, + calld); + } + return false; + } + // Check whether we have retries remaining. + ++calld->num_attempts_completed; + if (calld->num_attempts_completed >= retry_policy->max_attempts) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: exceeded %d retry attempts", chand, + calld, retry_policy->max_attempts); + } + return false; + } + // If the call was cancelled from the surface, don't retry. + if (calld->cancel_error != GRPC_ERROR_NONE) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: call cancelled from surface, not retrying", + chand, calld); + } + return false; + } + // Check server push-back. + grpc_millis server_pushback_ms = -1; + if (server_pushback_md != nullptr) { + // If the value is "-1" or any other unparseable string, we do not retry. + uint32_t ms; + if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: not retrying due to server push-back", + chand, calld); + } + return false; + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: server push-back: retry in %u ms", chand, + calld, ms); + } + server_pushback_ms = (grpc_millis)ms; + } + } + do_retry(elem, retry_state, server_pushback_ms); + return true; +} + +// +// subchannel_batch_data +// + +static subchannel_batch_data* batch_data_create(grpc_call_element* elem, + int refcount) { + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>( + gpr_arena_alloc(calld->arena, sizeof(*batch_data))); + batch_data->elem = elem; + batch_data->subchannel_call = + GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); + batch_data->batch.payload = &retry_state->batch_payload; + gpr_ref_init(&batch_data->refs, refcount); + GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.on_complete = &batch_data->on_complete; + GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); + return batch_data; +} + +static void batch_data_unref(subchannel_batch_data* batch_data) { + if (gpr_unref(&batch_data->refs)) { + if (batch_data->send_initial_metadata_storage != nullptr) { + grpc_metadata_batch_destroy(&batch_data->send_initial_metadata); + } + if (batch_data->send_trailing_metadata_storage != nullptr) { + grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata); + } + if (batch_data->batch.recv_initial_metadata) { + grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata); + } + if (batch_data->batch.recv_trailing_metadata) { + grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata); + } + GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); + call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); + GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); + } +} + +// +// recv_initial_metadata callback handling +// + +// Invokes recv_initial_metadata_ready for a subchannel batch. +static void invoke_recv_initial_metadata_callback(void* arg, + grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + channel_data* chand = + static_cast<channel_data*>(batch_data->elem->channel_data); + call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); + // Find pending batch. + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_initial_metadata && + batch->payload->recv_initial_metadata.recv_initial_metadata_ready != + nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: invoking recv_initial_metadata_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } + GPR_ASSERT(pending != nullptr); + // Return metadata. + grpc_metadata_batch_move( + &batch_data->recv_initial_metadata, + pending->batch->payload->recv_initial_metadata.recv_initial_metadata); + // Update bookkeeping. + // Note: Need to do this before invoking the callback, since invoking + // the callback will result in yielding the call combiner. + grpc_closure* recv_initial_metadata_ready = + pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready; + pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + nullptr; + maybe_clear_pending_batch(batch_data->elem, pending); + batch_data_unref(batch_data); + // Invoke callback. + GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); +} + +// Intercepts recv_initial_metadata_ready callback for retries. +// Commits the call and returns the initial metadata up the stack. +static void recv_initial_metadata_ready(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + grpc_call_element* elem = batch_data->elem; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", + chand, calld, grpc_error_string(error)); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // If we got an error or a Trailers-Only response and have not yet gotten + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. + if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && + !retry_state->completed_recv_trailing_metadata) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring recv_initial_metadata_ready " + "(Trailers-Only)", + chand, calld); + } + retry_state->recv_initial_metadata_ready_deferred = true; + retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); + if (!retry_state->started_recv_trailing_metadata) { + // recv_trailing_metadata not yet started by application; start it + // ourselves to get status. + start_internal_recv_trailing_metadata(elem); + } else { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner, + "recv_initial_metadata_ready trailers-only or error"); + } + return; + } + // Received valid initial metadata, so commit the call. + retry_commit(elem, retry_state); + // Manually invoking a callback function; it does not take ownership of error. + invoke_recv_initial_metadata_callback(batch_data, error); + GRPC_ERROR_UNREF(error); +} + +// +// recv_message callback handling +// + +// Invokes recv_message_ready for a subchannel batch. +static void invoke_recv_message_callback(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + channel_data* chand = + static_cast<channel_data*>(batch_data->elem->channel_data); + call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); + // Find pending op. + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: invoking recv_message_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } + GPR_ASSERT(pending != nullptr); + // Return payload. + *pending->batch->payload->recv_message.recv_message = + batch_data->recv_message; + // Update bookkeeping. + // Note: Need to do this before invoking the callback, since invoking + // the callback will result in yielding the call combiner. + grpc_closure* recv_message_ready = + pending->batch->payload->recv_message.recv_message_ready; + pending->batch->payload->recv_message.recv_message_ready = nullptr; + maybe_clear_pending_batch(batch_data->elem, pending); + batch_data_unref(batch_data); + // Invoke callback. + GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); +} + +// Intercepts recv_message_ready callback for retries. +// Commits the call and returns the message up the stack. +static void recv_message_ready(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + grpc_call_element* elem = batch_data->elem; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: got recv_message_ready, error=%s", + chand, calld, grpc_error_string(error)); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // If we got an error or the payload was nullptr and we have not yet gotten + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. + if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && + !retry_state->completed_recv_trailing_metadata) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring recv_message_ready (nullptr " + "message and recv_trailing_metadata pending)", + chand, calld); + } + retry_state->recv_message_ready_deferred = true; + retry_state->recv_message_error = GRPC_ERROR_REF(error); + if (!retry_state->started_recv_trailing_metadata) { + // recv_trailing_metadata not yet started by application; start it + // ourselves to get status. + start_internal_recv_trailing_metadata(elem); + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); + } + return; + } + // Received a valid message, so commit the call. + retry_commit(elem, retry_state); + // Manually invoking a callback function; it does not take ownership of error. + invoke_recv_message_callback(batch_data, error); + GRPC_ERROR_UNREF(error); +} + +// +// on_complete callback handling +// + +// Updates retry_state to reflect the ops completed in batch_data. +static void update_retry_state_for_completed_batch( + subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state) { + if (batch_data->batch.send_initial_metadata) { + retry_state->completed_send_initial_metadata = true; + } + if (batch_data->batch.send_message) { + ++retry_state->completed_send_message_count; + } + if (batch_data->batch.send_trailing_metadata) { + retry_state->completed_send_trailing_metadata = true; + } + if (batch_data->batch.recv_initial_metadata) { + retry_state->completed_recv_initial_metadata = true; + } + if (batch_data->batch.recv_message) { + ++retry_state->completed_recv_message_count; + } + if (batch_data->batch.recv_trailing_metadata) { + retry_state->completed_recv_trailing_metadata = true; + } +} + +// Represents a closure that needs to run as a result of a completed batch. +typedef struct { + grpc_closure* closure; + grpc_error* error; + const char* reason; +} closure_to_execute; + +// Adds any necessary closures for deferred recv_initial_metadata and +// recv_message callbacks to closures, updating *num_closures as needed. +static void add_closures_for_deferred_recv_callbacks( + subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, + closure_to_execute* closures, size_t* num_closures) { + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = + GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, batch_data, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_initial_metadata_error; + closure->reason = "resuming recv_initial_metadata_ready"; + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, + invoke_recv_message_callback, + batch_data, grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_message_error; + closure->reason = "resuming recv_message_ready"; + } +} + +// If there are any cached ops to replay or pending ops to start on the +// subchannel call, adds a closure to closures to invoke +// start_retriable_subchannel_batches(), updating *num_closures as needed. +static void add_closures_for_replay_or_pending_send_ops( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, closure_to_execute* closures, + size_t* num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + bool have_pending_send_message_ops = + retry_state->started_send_message_count < calld->send_messages.size(); + bool have_pending_send_trailing_metadata_op = + calld->seen_send_trailing_metadata && + !retry_state->started_send_trailing_metadata; + if (!have_pending_send_message_ops && + !have_pending_send_trailing_metadata_op) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch == nullptr || pending->send_ops_cached) continue; + if (batch->send_message) have_pending_send_message_ops = true; + if (batch->send_trailing_metadata) { + have_pending_send_trailing_metadata_op = true; + } + } + } + if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: starting next batch for pending send op(s)", + chand, calld); + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->batch.handler_private.closure, + start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); + closure->error = GRPC_ERROR_NONE; + closure->reason = "starting next batch for send_* op(s)"; + } +} + +// For any pending batch completed in batch_data, adds the necessary +// completion closures to closures, updating *num_closures as needed. +static void add_closures_for_completed_pending_batches( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, grpc_error* error, + closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_completed(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, + chand, calld, i); + } + // Copy the trailing metadata to return it to the surface. + if (batch_data->batch.recv_trailing_metadata) { + grpc_metadata_batch_move(&batch_data->recv_trailing_metadata, + pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata); + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); +} + +// For any pending batch containing an op that has not yet been started, +// adds the pending batch's completion closures to closures, updating +// *num_closures as needed. +static void add_closures_to_fail_unstarted_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_error* error, closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_unstarted(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failing unstarted pending batch at index " + "%" PRIuPTR, + chand, calld, i); + } + if (pending->batch->recv_initial_metadata) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = + "failing recv_initial_metadata_ready for pending batch"; + pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready = nullptr; + } + if (pending->batch->recv_message) { + *pending->batch->payload->recv_message.recv_message = nullptr; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = + pending->batch->payload->recv_message.recv_message_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing recv_message_ready for pending batch"; + pending->batch->payload->recv_message.recv_message_ready = nullptr; + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); +} + +// Callback used to intercept on_complete from subchannel calls. +// Called only when retries are enabled. +static void on_complete(void* arg, grpc_error* error) { + subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); + grpc_call_element* elem = batch_data->elem; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); + gpr_log(GPR_DEBUG, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", + chand, calld, grpc_error_string(error), batch_str); + gpr_free(batch_str); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); + // If we have previously completed recv_trailing_metadata, then the + // call is finished. + bool call_finished = retry_state->completed_recv_trailing_metadata; + // Update bookkeeping in retry_state. + update_retry_state_for_completed_batch(batch_data, retry_state); + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: call already finished", chand, + calld); + } + } else { + // Check if this batch finished the call, and if so, get its status. + // The call is finished if either (a) this callback was invoked with + // an error or (b) we receive status. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + if (error != GRPC_ERROR_NONE) { // Case (a). + call_finished = true; + grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, + nullptr); + } else if (batch_data->batch.recv_trailing_metadata) { // Case (b). + call_finished = true; + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + } + } else if (retry_state->completed_recv_trailing_metadata) { + call_finished = true; + } + if (call_finished && grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); + } + // If the call is finished, check if we should retry. + if (call_finished && + maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } + batch_data_unref(batch_data); + return; + } + } + // If the call is finished or retries are committed, free cached data for + // send ops that we've just completed. + if (call_finished || calld->retry_committed) { + free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); + } + // Call not being retried. + // Construct list of closures to execute. + // Max number of closures is number of pending batches plus one for + // each of: + // - recv_initial_metadata_ready (either deferred or unstarted) + // - recv_message_ready (either deferred or unstarted) + // - starting a new batch for pending send ops + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3]; + size_t num_closures = 0; + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures, + &num_closures); + // Find pending batches whose ops are now complete and add their + // on_complete callbacks to closures. + add_closures_for_completed_pending_batches(elem, batch_data, retry_state, + GRPC_ERROR_REF(error), closures, + &num_closures); + // Add closures to handle any pending batches that have not yet been started. + // If the call is finished, we fail these batches; otherwise, we add a + // callback to start_retriable_subchannel_batches() to start them on + // the subchannel call. + if (call_finished) { + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures); + } else { + add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, + closures, &num_closures); + } + // Don't need batch_data anymore. + batch_data_unref(batch_data); + // Schedule all of the closures identified above. + // Note that the call combiner will be yielded for each closure that + // we schedule. We're already running in the call combiner, so one of + // the closures can be scheduled directly, but the others will + // have to re-enter the call combiner. + if (num_closures > 0) { + GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); + for (size_t i = 1; i < num_closures; ++i) { + GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, + closures[i].error, closures[i].reason); + } + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "no closures to run for on_complete"); + } +} + +// +// subchannel batch construction +// + +// Helper function used to start a subchannel batch in the call combiner. +static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { + grpc_transport_stream_op_batch* batch = + static_cast<grpc_transport_stream_op_batch*>(arg); + grpc_subchannel_call* subchannel_call = + static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(subchannel_call, batch); +} + +// Adds retriable send_initial_metadata op to batch_data. +static void add_retriable_send_initial_metadata_op( + call_data* calld, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + // Maps the number of retries to the corresponding metadata value slice. + static const grpc_slice* retry_count_strings[] = { + &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; + // We need to make a copy of the metadata batch for each attempt, since + // the filters in the subchannel stack may modify this batch, and we don't + // want those modifications to be passed forward to subsequent attempts. + // + // If we've already completed one or more attempts, add the + // grpc-retry-attempts header. + batch_data->send_initial_metadata_storage = + static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( + calld->arena, sizeof(grpc_linked_mdelem) * + (calld->send_initial_metadata.list.count + + (calld->num_attempts_completed > 0)))); + grpc_metadata_batch_copy(&calld->send_initial_metadata, + &batch_data->send_initial_metadata, + batch_data->send_initial_metadata_storage); + if (batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts != + nullptr) { + grpc_metadata_batch_remove( + &batch_data->send_initial_metadata, + batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts); + } + if (calld->num_attempts_completed > 0) { + grpc_mdelem retry_md = grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, + *retry_count_strings[calld->num_attempts_completed - 1]); + grpc_error* error = grpc_metadata_batch_add_tail( + &batch_data->send_initial_metadata, + &batch_data->send_initial_metadata_storage[calld->send_initial_metadata + .list.count], + retry_md); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "error adding retry metadata: %s", + grpc_error_string(error)); + GPR_ASSERT(false); + } + } + retry_state->started_send_initial_metadata = true; + batch_data->batch.send_initial_metadata = true; + batch_data->batch.payload->send_initial_metadata.send_initial_metadata = + &batch_data->send_initial_metadata; + batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = + calld->send_initial_metadata_flags; + batch_data->batch.payload->send_initial_metadata.peer_string = + calld->peer_string; +} + +// Adds retriable send_message op to batch_data. +static void add_retriable_send_message_op( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", + chand, calld, retry_state->started_send_message_count); + } + grpc_byte_stream_cache* cache = + calld->send_messages[retry_state->started_send_message_count]; + ++retry_state->started_send_message_count; + grpc_caching_byte_stream_init(&batch_data->send_message, cache); + batch_data->batch.send_message = true; + batch_data->batch.payload->send_message.send_message = + &batch_data->send_message.base; +} + +// Adds retriable send_trailing_metadata op to batch_data. +static void add_retriable_send_trailing_metadata_op( + call_data* calld, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + // We need to make a copy of the metadata batch for each attempt, since + // the filters in the subchannel stack may modify this batch, and we don't + // want those modifications to be passed forward to subsequent attempts. + batch_data->send_trailing_metadata_storage = + static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( + calld->arena, sizeof(grpc_linked_mdelem) * + calld->send_trailing_metadata.list.count)); + grpc_metadata_batch_copy(&calld->send_trailing_metadata, + &batch_data->send_trailing_metadata, + batch_data->send_trailing_metadata_storage); + retry_state->started_send_trailing_metadata = true; + batch_data->batch.send_trailing_metadata = true; + batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = + &batch_data->send_trailing_metadata; +} + +// Adds retriable recv_initial_metadata op to batch_data. +static void add_retriable_recv_initial_metadata_op( + call_data* calld, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + retry_state->started_recv_initial_metadata = true; + batch_data->batch.recv_initial_metadata = true; + grpc_metadata_batch_init(&batch_data->recv_initial_metadata); + batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = + &batch_data->recv_initial_metadata; + batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = + &batch_data->trailing_metadata_available; + GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, + recv_initial_metadata_ready, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = + &batch_data->recv_initial_metadata_ready; +} + +// Adds retriable recv_message op to batch_data. +static void add_retriable_recv_message_op( + call_data* calld, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + ++retry_state->started_recv_message_count; + batch_data->batch.recv_message = true; + batch_data->batch.payload->recv_message.recv_message = + &batch_data->recv_message; + GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready, + batch_data, grpc_schedule_on_exec_ctx); + batch_data->batch.payload->recv_message.recv_message_ready = + &batch_data->recv_message_ready; +} + +// Adds retriable recv_trailing_metadata op to batch_data. +static void add_retriable_recv_trailing_metadata_op( + call_data* calld, subchannel_call_retry_state* retry_state, + subchannel_batch_data* batch_data) { + retry_state->started_recv_trailing_metadata = true; + batch_data->batch.recv_trailing_metadata = true; + grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); + batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = + &batch_data->recv_trailing_metadata; + batch_data->batch.collect_stats = true; + batch_data->batch.payload->collect_stats.collect_stats = + &batch_data->collect_stats; +} + +// Helper function used to start a recv_trailing_metadata batch. This +// is used in the case where a recv_initial_metadata or recv_message +// op fails in a way that we know the call is over but when the application +// has not yet started its own recv_trailing_metadata op. +static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: call failed but recv_trailing_metadata not " + "started; starting it internally", chand, calld); } - if (chand->retry_throttle_data != nullptr) { - calld->retry_throttle_data = - grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); + subchannel_batch_data* batch_data = batch_data_create(elem, 1); + add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); +} + +// If there are any cached send ops that need to be replayed on the +// current subchannel call, creates and returns a new subchannel batch +// to replay those ops. Otherwise, returns nullptr. +static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( + grpc_call_element* elem, subchannel_call_retry_state* retry_state) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + subchannel_batch_data* replay_batch_data = nullptr; + // send_initial_metadata. + if (calld->seen_send_initial_metadata && + !retry_state->started_send_initial_metadata && + !calld->pending_send_initial_metadata) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: replaying previously completed " + "send_initial_metadata op", + chand, calld); + } + replay_batch_data = batch_data_create(elem, 1); + add_retriable_send_initial_metadata_op(calld, retry_state, + replay_batch_data); + } + // send_message. + // Note that we can only have one send_message op in flight at a time. + if (retry_state->started_send_message_count < calld->send_messages.size() && + retry_state->started_send_message_count == + retry_state->completed_send_message_count && + !calld->pending_send_message) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: replaying previously completed " + "send_message op", + chand, calld); + } + if (replay_batch_data == nullptr) { + replay_batch_data = batch_data_create(elem, 1); + } + add_retriable_send_message_op(elem, retry_state, replay_batch_data); + } + // send_trailing_metadata. + // Note that we only add this op if we have no more send_message ops + // to start, since we can't send down any more send_message ops after + // send_trailing_metadata. + if (calld->seen_send_trailing_metadata && + retry_state->started_send_message_count == calld->send_messages.size() && + !retry_state->started_send_trailing_metadata && + !calld->pending_send_trailing_metadata) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: replaying previously completed " + "send_trailing_metadata op", + chand, calld); + } + if (replay_batch_data == nullptr) { + replay_batch_data = batch_data_create(elem, 1); + } + add_retriable_send_trailing_metadata_op(calld, retry_state, + replay_batch_data); } - if (chand->method_params_table != nullptr) { - calld->method_params = static_cast<method_parameters*>( - grpc_method_config_table_get(chand->method_params_table, calld->path)); - if (calld->method_params != nullptr) { - method_parameters_ref(calld->method_params); - // If the deadline from the service config is shorter than the one - // from the client API, reset the deadline timer. - if (chand->deadline_checking_enabled && - calld->method_params->timeout != 0) { - const grpc_millis per_method_deadline = - grpc_timespec_to_millis_round_up(calld->call_start_time) + - calld->method_params->timeout; - if (per_method_deadline < calld->deadline) { - calld->deadline = per_method_deadline; - grpc_deadline_state_reset(elem, calld->deadline); - } + return replay_batch_data; +} + +// Adds subchannel batches for pending batches to batches, updating +// *num_batches as needed. +static void add_subchannel_batches_for_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_transport_stream_op_batch** batches, size_t* num_batches) { + call_data* calld = static_cast<call_data*>(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + grpc_transport_stream_op_batch* batch = pending->batch; + if (batch == nullptr) continue; + // Skip any batch that either (a) has already been started on this + // subchannel call or (b) we can't start yet because we're still + // replaying send ops that need to be completed first. + // TODO(roth): Note that if any one op in the batch can't be sent + // yet due to ops that we're replaying, we don't start any of the ops + // in the batch. This is probably okay, but it could conceivably + // lead to increased latency in some cases -- e.g., we could delay + // starting a recv op due to it being in the same batch with a send + // op. If/when we revamp the callback protocol in + // transport_stream_op_batch, we may be able to fix this. + if (batch->send_initial_metadata && + retry_state->started_send_initial_metadata) { + continue; + } + if (batch->send_message && retry_state->completed_send_message_count < + retry_state->started_send_message_count) { + continue; + } + // Note that we only start send_trailing_metadata if we have no more + // send_message ops to start, since we can't send down any more + // send_message ops after send_trailing_metadata. + if (batch->send_trailing_metadata && + (retry_state->started_send_message_count + batch->send_message < + calld->send_messages.size() || + retry_state->started_send_trailing_metadata)) { + continue; + } + if (batch->recv_initial_metadata && + retry_state->started_recv_initial_metadata) { + continue; + } + if (batch->recv_message && retry_state->completed_recv_message_count < + retry_state->started_recv_message_count) { + continue; + } + if (batch->recv_trailing_metadata && + retry_state->started_recv_trailing_metadata) { + continue; + } + // If we're not retrying, just send the batch as-is. + if (calld->method_params == nullptr || + calld->method_params->retry_policy() == nullptr || + calld->retry_committed) { + batches[(*num_batches)++] = batch; + pending_batch_clear(calld, pending); + continue; + } + // Create batch with the right number of callbacks. + const int num_callbacks = + 1 + batch->recv_initial_metadata + batch->recv_message; + subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks); + // Cache send ops if needed. + maybe_cache_send_ops_for_batch(calld, pending); + // send_initial_metadata. + if (batch->send_initial_metadata) { + add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); + } + // send_message. + if (batch->send_message) { + add_retriable_send_message_op(elem, retry_state, batch_data); + } + // send_trailing_metadata. + if (batch->send_trailing_metadata) { + add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); + } + // recv_initial_metadata. + if (batch->recv_initial_metadata) { + // recv_flags is only used on the server side. + GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); + add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); + } + // recv_message. + if (batch->recv_message) { + add_retriable_recv_message_op(calld, retry_state, batch_data); + } + // recv_trailing_metadata. + if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->collect_stats); + add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); + } + batches[(*num_batches)++] = &batch_data->batch; + } +} + +// Constructs and starts whatever subchannel batches are needed on the +// subchannel call. +static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: constructing retriable batches", + chand, calld); + } + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); + // We can start up to 6 batches. + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; + // Replay previously-returned send_* ops if needed. + subchannel_batch_data* replay_batch_data = + maybe_create_subchannel_batch_for_replay(elem, retry_state); + if (replay_batch_data != nullptr) { + batches[num_batches++] = &replay_batch_data->batch; + } + // Now add pending batches. + add_subchannel_batches_for_pending_batches(elem, retry_state, batches, + &num_batches); + // Start batches on subchannel call. + // Note that the call combiner will be yielded for each batch that we + // send down. We're already running in the call combiner, so one of + // the batches can be started directly, but the others will have to + // re-enter the call combiner. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: starting %" PRIuPTR + " retriable batches on subchannel_call=%p", + chand, calld, num_batches, calld->subchannel_call); + } + if (num_batches == 0) { + // This should be fairly rare, but it can happen when (e.g.) an + // attempt completes before it has finished replaying all + // previously sent messages. + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "no retriable subchannel batches to start"); + } else { + for (size_t i = 1; i < num_batches; ++i) { + if (grpc_client_channel_trace.enabled()) { + char* batch_str = grpc_transport_stream_op_batch_string(batches[i]); + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: starting batch in call combiner: %s", chand, + calld, batch_str); + gpr_free(batch_str); } + batches[i]->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure, + start_batch_in_call_combiner, batches[i], + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batches[i]->handler_private.closure, + GRPC_ERROR_NONE, "start_subchannel_batch"); + } + if (grpc_client_channel_trace.enabled()) { + char* batch_str = grpc_transport_stream_op_batch_string(batches[0]); + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld, + batch_str); + gpr_free(batch_str); } + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); } } -static void create_subchannel_call_locked(grpc_call_element* elem, - grpc_error* error) { +// +// LB pick +// + +static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); + const size_t parent_data_size = + calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path @@ -1001,7 +2466,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem, calld->deadline, // deadline calld->arena, // arena calld->pick.subchannel_call_context, // context - calld->call_combiner // call_combiner + calld->call_combiner, // call_combiner + parent_data_size // parent_data_size }; grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( call_args, &calld->subchannel_call); @@ -1011,36 +2477,61 @@ static void create_subchannel_call_locked(grpc_call_element* elem, } if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail(elem, new_error); + pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { - waiting_for_pick_batches_resume(elem); + if (parent_data_size > 0) { + subchannel_call_retry_state* retry_state = + static_cast<subchannel_call_retry_state*>( + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); + retry_state->batch_payload.context = calld->pick.subchannel_call_context; + } + pending_batches_resume(elem); } GRPC_ERROR_UNREF(error); } // Invoked when a pick is completed, on both success or failure. -static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { - call_data* calld = static_cast<call_data*>(elem->call_data); +static void pick_done(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); if (calld->pick.connected_subchannel == nullptr) { // Failed to create subchannel. - GRPC_ERROR_UNREF(calld->error); - calld->error = error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: failed to create subchannel: error=%s", chand, - calld, grpc_error_string(calld->error)); + // If there was no error, this is an LB policy drop, in which case + // we return an error; otherwise, we may retry. + grpc_status_code status = GRPC_STATUS_OK; + grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, + nullptr); + if (error == GRPC_ERROR_NONE || !calld->enable_retries || + !maybe_retry(elem, nullptr /* batch_data */, status, + nullptr /* server_pushback_md */)) { + grpc_error* new_error = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failed to create subchannel: error=%s", + chand, calld, grpc_error_string(new_error)); + } + pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } - waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error)); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(elem, GRPC_ERROR_REF(error)); + create_subchannel_call(elem, GRPC_ERROR_REF(error)); } - GRPC_ERROR_UNREF(error); +} + +// Invoked when a pick is completed to leave the client_channel combiner +// and continue processing in the call combiner. +static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&calld->pick_closure, error); } // A wrapper around pick_done_locked() that is used in cases where @@ -1088,6 +2579,45 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } +// Applies service config to the call. Must be invoked once we know +// that the resolver has returned results to the channel. +static void apply_service_config_to_call_locked(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + chand, calld); + } + if (chand->retry_throttle_data != nullptr) { + calld->retry_throttle_data = + grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); + } + if (chand->method_params_table != nullptr) { + calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( + *chand->method_params_table, calld->path); + if (calld->method_params != nullptr) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (chand->deadline_checking_enabled && + calld->method_params->timeout() != 0) { + const grpc_millis per_method_deadline = + grpc_timespec_to_millis_round_up(calld->call_start_time) + + calld->method_params->timeout(); + if (per_method_deadline < calld->deadline) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(elem, calld->deadline); + } + } + } + } + // If no retry policy, disable retries. + // TODO(roth): Remove this when adding support for transparent retries. + if (calld->method_params == nullptr || + calld->method_params->retry_policy() == nullptr) { + calld->enable_retries = false; + } +} + // Starts a pick on chand->lb_policy. // Returns true if pick is completed synchronously. static bool pick_callback_start_locked(grpc_call_element* elem) { @@ -1097,33 +2627,46 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy.get()); } - apply_service_config_to_call_locked(elem); + // Only get service config data on the first attempt. + if (calld->num_attempts_completed == 0) { + apply_service_config_to_call_locked(elem); + } // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. - uint32_t initial_metadata_flags = - calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata_flags; + // + // The send_initial_metadata batch will be the first one in the list, + // as set by get_batch_index() above. + calld->pick.initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; const bool wait_for_ready_set_from_api = - initial_metadata_flags & + send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; const bool wait_for_ready_set_from_service_config = calld->method_params != nullptr && - calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; + calld->method_params->wait_for_ready() != + ClientChannelMethodParams::WAIT_FOR_READY_UNSET; if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { - if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { - initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; + if (calld->method_params->wait_for_ready() == + ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { + send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { - initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; + send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } - calld->pick.initial_metadata = - calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata; - calld->pick.initial_metadata_flags = initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, + calld->pick.initial_metadata_flags = send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->lb_pick_closure; + calld->pick.on_complete = &calld->pick_closure; GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); if (pick_done) { @@ -1137,7 +2680,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure, + GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, pick_callback_cancel_locked, elem, grpc_combiner_scheduler(chand->combiner))); } @@ -1186,8 +2729,6 @@ static void pick_after_resolver_result_cancel_locked(void* arg, "Pick cancelled", &error, 1)); } -static void pick_after_resolver_result_start_locked(grpc_call_element* elem); - static void pick_after_resolver_result_done_locked(void* arg, grpc_error* error) { pick_after_resolver_result_args* args = @@ -1224,7 +2765,7 @@ static void pick_after_resolver_result_done_locked(void* arg, async_pick_done_locked(elem, GRPC_ERROR_NONE); } } - // TODO(roth): It should be impossible for chand->lb_policy to be NULL + // TODO(roth): It should be impossible for chand->lb_policy to be nullptr // here, so the rest of this code should never actually be executed. // However, we have reports of a crash on iOS that triggers this case, // so we are temporarily adding this to restore branches that were @@ -1277,6 +2818,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data); GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(calld->subchannel_call == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1305,24 +2847,9 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { chand->interested_parties); } -static void on_complete(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (calld->retry_throttle_data != nullptr) { - if (error == GRPC_ERROR_NONE) { - grpc_server_retry_throttle_data_record_success( - calld->retry_throttle_data); - } else { - // TODO(roth): In a subsequent PR, check the return value here and - // decide whether or not to retry. Note that we should only - // record failures whose statuses match the configured retryable - // or non-fatal status codes. - grpc_server_retry_throttle_data_record_failure( - calld->retry_throttle_data); - } - } - GRPC_CLOSURE_RUN(calld->original_on_complete, GRPC_ERROR_REF(error)); -} +// +// filter call vtable functions +// static void cc_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { @@ -1333,46 +2860,47 @@ static void cc_start_transport_stream_op_batch( grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } // If we've previously been cancelled, immediately fail any new batches. - if (calld->error != GRPC_ERROR_NONE) { + if (calld->cancel_error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", - chand, calld, grpc_error_string(calld->error)); + chand, calld, grpc_error_string(calld->cancel_error)); } + // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(calld->error), calld->call_combiner); + batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); return; } + // Handle cancellation. if (batch->cancel_stream) { // Stash a copy of cancel_error in our call data, so that we can use // it for subsequent operations. This ensures that if the call is // cancelled before any batches are passed down (e.g., if the deadline // is in the past when the call starts), we can return the right // error to the caller when the first batch does get passed down. - GRPC_ERROR_UNREF(calld->error); - calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); + GRPC_ERROR_UNREF(calld->cancel_error); + calld->cancel_error = + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, - calld, grpc_error_string(calld->error)); + calld, grpc_error_string(calld->cancel_error)); } - // If we have a subchannel call, send the cancellation batch down. - // Otherwise, fail all pending batches. - if (calld->subchannel_call != nullptr) { - grpc_subchannel_call_process_op(calld->subchannel_call, batch); + // If we do not have a subchannel call (i.e., a pick has not yet + // been started), fail all pending batches. Otherwise, send the + // cancellation down to the subchannel call. + if (calld->subchannel_call == nullptr) { + pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), + false /* yield_call_combiner */); + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); } else { - waiting_for_pick_batches_add(calld, batch); - waiting_for_pick_batches_fail(elem, GRPC_ERROR_REF(calld->error)); + // Note: This will release the call combiner. + grpc_subchannel_call_process_op(calld->subchannel_call, batch); } return; } - // Intercept on_complete for recv_trailing_metadata so that we can - // check retry throttle status. - if (batch->recv_trailing_metadata) { - GPR_ASSERT(batch->on_complete != nullptr); - calld->original_on_complete = batch->on_complete; - GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, - grpc_schedule_on_exec_ctx); - batch->on_complete = &calld->on_complete; - } + // Add the batch to the pending list. + pending_batches_add(elem, batch); // Check if we've already gotten a subchannel call. // Note that once we have completed the pick, we do not need to enter // the channel combiner, which is more efficient (especially for @@ -1380,15 +2908,13 @@ static void cc_start_transport_stream_op_batch( if (calld->subchannel_call != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, - "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, calld, calld->subchannel_call); } - grpc_subchannel_call_process_op(calld->subchannel_call, batch); + pending_batches_resume(elem); return; } // We do not yet have a subchannel call. - // Add the batch to the waiting-for-pick list. - waiting_for_pick_batches_add(calld, batch); // For batches containing a send_initial_metadata op, enter the channel // combiner to start a pick. if (batch->send_initial_metadata) { @@ -1428,6 +2954,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, calld->deadline); } + calld->enable_retries = chand->enable_retries; return GRPC_ERROR_NONE; } @@ -1441,10 +2968,8 @@ static void cc_destroy_call_elem(grpc_call_element* elem, grpc_deadline_state_destroy(elem); } grpc_slice_unref_internal(calld->path); - if (calld->method_params != nullptr) { - method_parameters_unref(calld->method_params); - } - GRPC_ERROR_UNREF(calld->error); + calld->method_params.reset(); + GRPC_ERROR_UNREF(calld->cancel_error); if (calld->subchannel_call != nullptr) { grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, then_schedule_closure); @@ -1452,7 +2977,9 @@ static void cc_destroy_call_elem(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + GPR_ASSERT(calld->pending_batches[i].batch == nullptr); + } if (calld->pick.connected_subchannel != nullptr) { calld->pick.connected_subchannel.reset(); } @@ -1652,3 +3179,9 @@ void grpc_client_channel_watch_connectivity_state( grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } + +grpc_subchannel_call* grpc_client_channel_get_subchannel_call( + grpc_call_element* elem) { + call_data* calld = static_cast<call_data*>(elem->call_data); + return calld->subchannel_call; +} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc index 7abd7f37f9..441efd5e23 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -30,47 +30,41 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/lb_targets_info.h" +#include "src/core/lib/security/transport/target_authority_table.h" #include "src/core/lib/slice/slice_internal.h" -static void destroy_balancer_name(void* balancer_name) { - gpr_free(balancer_name); -} - -static grpc_slice_hash_table_entry targets_info_entry_create( - const char* address, const char* balancer_name) { - grpc_slice_hash_table_entry entry; - entry.key = grpc_slice_from_copied_string(address); - entry.value = gpr_strdup(balancer_name); - return entry; -} +namespace grpc_core { +namespace { -static int balancer_name_cmp_fn(void* a, void* b) { - const char* a_str = static_cast<const char*>(a); - const char* b_str = static_cast<const char*>(b); - return strcmp(a_str, b_str); +int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, + const grpc_core::UniquePtr<char>& b) { + return strcmp(a.get(), b.get()); } -static grpc_slice_hash_table* build_targets_info_table( +RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( grpc_lb_addresses* addresses) { - grpc_slice_hash_table_entry* targets_info_entries = - static_cast<grpc_slice_hash_table_entry*>( - gpr_zalloc(sizeof(*targets_info_entries) * addresses->num_addresses)); + TargetAuthorityTable::Entry* target_authority_entries = + static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( + sizeof(*target_authority_entries) * addresses->num_addresses)); for (size_t i = 0; i < addresses->num_addresses; ++i) { char* addr_str; GPR_ASSERT(grpc_sockaddr_to_string( &addr_str, &addresses->addresses[i].address, true) > 0); - targets_info_entries[i] = targets_info_entry_create( - addr_str, addresses->addresses[i].balancer_name); + target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); + target_authority_entries[i].value.reset( + gpr_strdup(addresses->addresses[i].balancer_name)); gpr_free(addr_str); } - grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create( - addresses->num_addresses, targets_info_entries, destroy_balancer_name, - balancer_name_cmp_fn); - gpr_free(targets_info_entries); - return targets_info; + RefCountedPtr<TargetAuthorityTable> target_authority_table = + TargetAuthorityTable::Create(addresses->num_addresses, + target_authority_entries, BalancerNameCmp); + gpr_free(target_authority_entries); + return target_authority_table; } +} // namespace +} // namespace grpc_core + grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( grpc_channel_args* args) { const char* args_to_remove[1]; @@ -83,9 +77,11 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( GPR_ASSERT(arg->type == GRPC_ARG_POINTER); grpc_lb_addresses* addresses = static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - grpc_slice_hash_table* targets_info = build_targets_info_table(addresses); + grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> + target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); args_to_add[num_args_to_add++] = - grpc_lb_targets_info_create_channel_arg(targets_info); + grpc_core::CreateTargetAuthorityTableChannelArg( + target_authority_table.get()); // Substitute the channel credentials with a version without call // credentials: the load balancer is not necessarily trusted to handle // bearer token credentials. @@ -105,7 +101,6 @@ grpc_channel_args* grpc_lb_policy_grpclb_modify_lb_channel_args( args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); // Clean up. grpc_channel_args_destroy(args); - grpc_slice_hash_table_unref(targets_info); if (creds_sans_call_creds != nullptr) { grpc_channel_credentials_unref(creds_sans_call_creds); } diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc new file mode 100644 index 0000000000..374b87e170 --- /dev/null +++ b/src/core/ext/filters/client_channel/method_params.cc @@ -0,0 +1,178 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/filters/client_channel/method_params.h" +#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" + +// As per the retry design, we do not allow more than 5 retry attempts. +#define MAX_MAX_RETRY_ATTEMPTS 5 + +namespace grpc_core { +namespace internal { + +namespace { + +bool ParseWaitForReady( + grpc_json* field, ClientChannelMethodParams::WaitForReady* wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE + ? ClientChannelMethodParams::WAIT_FOR_READY_TRUE + : ClientChannelMethodParams::WAIT_FOR_READY_FALSE; + return true; +} + +// Parses a JSON field of the form generated for a google.proto.Duration +// proto message, as per: +// https://developers.google.com/protocol-buffers/docs/proto3#json +bool ParseDuration(grpc_json* field, grpc_millis* duration) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + UniquePtr<char> buf(gpr_strdup(field->value)); + *(buf.get() + len - 1) = '\0'; // Remove trailing 's'. + char* decimal_point = strchr(buf.get(), '.'); + int nanos = 0; + if (decimal_point != nullptr) { + *decimal_point = '\0'; + nanos = gpr_parse_nonnegative_int(decimal_point + 1); + if (nanos == -1) { + return false; + } + int num_digits = static_cast<int>(strlen(decimal_point + 1)); + if (num_digits > 9) { // We don't accept greater precision than nanos. + return false; + } + for (int i = 0; i < (9 - num_digits); ++i) { + nanos *= 10; + } + } + int seconds = + decimal_point == buf.get() ? 0 : gpr_parse_nonnegative_int(buf.get()); + if (seconds == -1) return false; + *duration = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; + return true; +} + +UniquePtr<ClientChannelMethodParams::RetryPolicy> ParseRetryPolicy( + grpc_json* field) { + auto retry_policy = MakeUnique<ClientChannelMethodParams::RetryPolicy>(); + if (field->type != GRPC_JSON_OBJECT) return nullptr; + for (grpc_json* sub_field = field->child; sub_field != nullptr; + sub_field = sub_field->next) { + if (sub_field->key == nullptr) return nullptr; + if (strcmp(sub_field->key, "maxAttempts") == 0) { + if (retry_policy->max_attempts != 0) return nullptr; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; + retry_policy->max_attempts = gpr_parse_nonnegative_int(sub_field->value); + if (retry_policy->max_attempts <= 1) return nullptr; + if (retry_policy->max_attempts > MAX_MAX_RETRY_ATTEMPTS) { + gpr_log(GPR_ERROR, + "service config: clamped retryPolicy.maxAttempts at %d", + MAX_MAX_RETRY_ATTEMPTS); + retry_policy->max_attempts = MAX_MAX_RETRY_ATTEMPTS; + } + } else if (strcmp(sub_field->key, "initialBackoff") == 0) { + if (retry_policy->initial_backoff > 0) return nullptr; // Duplicate. + if (!ParseDuration(sub_field, &retry_policy->initial_backoff)) { + return nullptr; + } + if (retry_policy->initial_backoff == 0) return nullptr; + } else if (strcmp(sub_field->key, "maxBackoff") == 0) { + if (retry_policy->max_backoff > 0) return nullptr; // Duplicate. + if (!ParseDuration(sub_field, &retry_policy->max_backoff)) { + return nullptr; + } + if (retry_policy->max_backoff == 0) return nullptr; + } else if (strcmp(sub_field->key, "backoffMultiplier") == 0) { + if (retry_policy->backoff_multiplier != 0) return nullptr; // Duplicate. + if (sub_field->type != GRPC_JSON_NUMBER) return nullptr; + if (sscanf(sub_field->value, "%f", &retry_policy->backoff_multiplier) != + 1) { + return nullptr; + } + if (retry_policy->backoff_multiplier <= 0) return nullptr; + } else if (strcmp(sub_field->key, "retryableStatusCodes") == 0) { + if (!retry_policy->retryable_status_codes.Empty()) { + return nullptr; // Duplicate. + } + if (sub_field->type != GRPC_JSON_ARRAY) return nullptr; + for (grpc_json* element = sub_field->child; element != nullptr; + element = element->next) { + if (element->type != GRPC_JSON_STRING) return nullptr; + grpc_status_code status; + if (!grpc_status_code_from_string(element->value, &status)) { + return nullptr; + } + retry_policy->retryable_status_codes.Add(status); + } + if (retry_policy->retryable_status_codes.Empty()) return nullptr; + } + } + // Make sure required fields are set. + if (retry_policy->max_attempts == 0 || retry_policy->initial_backoff == 0 || + retry_policy->max_backoff == 0 || retry_policy->backoff_multiplier == 0 || + retry_policy->retryable_status_codes.Empty()) { + return nullptr; + } + return retry_policy; +} + +} // namespace + +RefCountedPtr<ClientChannelMethodParams> +ClientChannelMethodParams::CreateFromJson(const grpc_json* json) { + RefCountedPtr<ClientChannelMethodParams> method_params = + MakeRefCounted<ClientChannelMethodParams>(); + for (grpc_json* field = json->child; field != nullptr; field = field->next) { + if (field->key == nullptr) continue; + if (strcmp(field->key, "waitForReady") == 0) { + if (method_params->wait_for_ready_ != WAIT_FOR_READY_UNSET) { + return nullptr; // Duplicate. + } + if (!ParseWaitForReady(field, &method_params->wait_for_ready_)) { + return nullptr; + } + } else if (strcmp(field->key, "timeout") == 0) { + if (method_params->timeout_ > 0) return nullptr; // Duplicate. + if (!ParseDuration(field, &method_params->timeout_)) return nullptr; + } else if (strcmp(field->key, "retryPolicy") == 0) { + if (method_params->retry_policy_ != nullptr) { + return nullptr; // Duplicate. + } + method_params->retry_policy_ = ParseRetryPolicy(field); + if (method_params->retry_policy_ == nullptr) return nullptr; + } + } + return method_params; +} + +} // namespace internal +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h new file mode 100644 index 0000000000..48ece29867 --- /dev/null +++ b/src/core/ext/filters/client_channel/method_params.h @@ -0,0 +1,74 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis +#include "src/core/lib/json/json.h" + +namespace grpc_core { +namespace internal { + +class ClientChannelMethodParams : public RefCounted<ClientChannelMethodParams> { + public: + enum WaitForReady { + WAIT_FOR_READY_UNSET = 0, + WAIT_FOR_READY_FALSE, + WAIT_FOR_READY_TRUE + }; + + struct RetryPolicy { + int max_attempts = 0; + grpc_millis initial_backoff = 0; + grpc_millis max_backoff = 0; + float backoff_multiplier = 0; + StatusCodeSet retryable_status_codes; + }; + + /// Creates a method_parameters object from \a json. + /// Intended for use with ServiceConfig::CreateMethodConfigTable(). + static RefCountedPtr<ClientChannelMethodParams> CreateFromJson( + const grpc_json* json); + + grpc_millis timeout() const { return timeout_; } + WaitForReady wait_for_ready() const { return wait_for_ready_; } + const RetryPolicy* retry_policy() const { return retry_policy_.get(); } + + private: + // So New() can call our private ctor. + template <typename T, typename... Args> + friend T* grpc_core::New(Args&&... args); + + ClientChannelMethodParams() {} + virtual ~ClientChannelMethodParams() {} + + grpc_millis timeout_ = 0; + WaitForReady wait_for_ready_ = WAIT_FOR_READY_UNSET; + UniquePtr<RetryPolicy> retry_policy_; +}; + +} // namespace internal +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_METHOD_PARAMS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index a24e8ff352..aa93e5d8de 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -295,7 +295,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { size_t num_args_to_add = 0; new_args[num_args_to_add++] = grpc_lb_addresses_create_channel_arg(r->lb_addresses_); - grpc_service_config* service_config = nullptr; + grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config; char* service_config_string = nullptr; if (r->service_config_json_ != nullptr) { service_config_string = ChooseServiceConfig(r->service_config_json_); @@ -306,10 +306,11 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG; new_args[num_args_to_add++] = grpc_channel_arg_string_create( (char*)GRPC_ARG_SERVICE_CONFIG, service_config_string); - service_config = grpc_service_config_create(service_config_string); + service_config = + grpc_core::ServiceConfig::Create(service_config_string); if (service_config != nullptr) { const char* lb_policy_name = - grpc_service_config_get_lb_policy_name(service_config); + service_config->GetLoadBalancingPolicyName(); if (lb_policy_name != nullptr) { args_to_remove[num_args_to_remove++] = GRPC_ARG_LB_POLICY_NAME; new_args[num_args_to_add++] = grpc_channel_arg_string_create( @@ -322,7 +323,6 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { result = grpc_channel_args_copy_and_add_and_remove( r->channel_args_, args_to_remove, num_args_to_remove, new_args, num_args_to_add); - if (service_config != nullptr) grpc_service_config_destroy(service_config); gpr_free(service_config_string); grpc_lb_addresses_destroy(r->lb_addresses_); // Reset backoff state so that we start from the beginning when the diff --git a/src/core/ext/filters/client_channel/retry_throttle.cc b/src/core/ext/filters/client_channel/retry_throttle.cc index 450a332342..45de6667c8 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.cc +++ b/src/core/ext/filters/client_channel/retry_throttle.cc @@ -40,7 +40,7 @@ struct grpc_server_retry_throttle_data { int milli_token_ratio; gpr_atm milli_tokens; // A pointer to the replacement for this grpc_server_retry_throttle_data - // entry. If non-NULL, then this entry is stale and must not be used. + // entry. If non-nullptr, then this entry is stale and must not be used. // We hold a reference to the replacement. gpr_atm replacement; }; @@ -58,6 +58,7 @@ static void get_replacement_throttle_data_if_needed( bool grpc_server_retry_throttle_data_record_failure( grpc_server_retry_throttle_data* throttle_data) { + if (throttle_data == nullptr) return true; // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. @@ -72,6 +73,7 @@ bool grpc_server_retry_throttle_data_record_failure( void grpc_server_retry_throttle_data_record_success( grpc_server_retry_throttle_data* throttle_data) { + if (throttle_data == nullptr) return; // First, check if we are stale and need to be replaced. get_replacement_throttle_data_if_needed(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. diff --git a/src/core/ext/filters/client_channel/status_util.cc b/src/core/ext/filters/client_channel/status_util.cc new file mode 100644 index 0000000000..11f732ab44 --- /dev/null +++ b/src/core/ext/filters/client_channel/status_util.cc @@ -0,0 +1,100 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/status_util.h" + +#include "src/core/lib/gpr/useful.h" + +typedef struct { + const char* str; + grpc_status_code status; +} status_string_entry; + +static const status_string_entry g_status_string_entries[] = { + {"OK", GRPC_STATUS_OK}, + {"CANCELLED", GRPC_STATUS_CANCELLED}, + {"UNKNOWN", GRPC_STATUS_UNKNOWN}, + {"INVALID_ARGUMENT", GRPC_STATUS_INVALID_ARGUMENT}, + {"DEADLINE_EXCEEDED", GRPC_STATUS_DEADLINE_EXCEEDED}, + {"NOT_FOUND", GRPC_STATUS_NOT_FOUND}, + {"ALREADY_EXISTS", GRPC_STATUS_ALREADY_EXISTS}, + {"PERMISSION_DENIED", GRPC_STATUS_PERMISSION_DENIED}, + {"UNAUTHENTICATED", GRPC_STATUS_UNAUTHENTICATED}, + {"RESOURCE_EXHAUSTED", GRPC_STATUS_RESOURCE_EXHAUSTED}, + {"FAILED_PRECONDITION", GRPC_STATUS_FAILED_PRECONDITION}, + {"ABORTED", GRPC_STATUS_ABORTED}, + {"OUT_OF_RANGE", GRPC_STATUS_OUT_OF_RANGE}, + {"UNIMPLEMENTED", GRPC_STATUS_UNIMPLEMENTED}, + {"INTERNAL", GRPC_STATUS_INTERNAL}, + {"UNAVAILABLE", GRPC_STATUS_UNAVAILABLE}, + {"DATA_LOSS", GRPC_STATUS_DATA_LOSS}, +}; + +bool grpc_status_code_from_string(const char* status_str, + grpc_status_code* status) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(g_status_string_entries); ++i) { + if (strcmp(status_str, g_status_string_entries[i].str) == 0) { + *status = g_status_string_entries[i].status; + return true; + } + } + return false; +} + +const char* grpc_status_code_to_string(grpc_status_code status) { + switch (status) { + case GRPC_STATUS_OK: + return "OK"; + case GRPC_STATUS_CANCELLED: + return "CANCELLED"; + case GRPC_STATUS_UNKNOWN: + return "UNKNOWN"; + case GRPC_STATUS_INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case GRPC_STATUS_NOT_FOUND: + return "NOT_FOUND"; + case GRPC_STATUS_ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case GRPC_STATUS_PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case GRPC_STATUS_UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case GRPC_STATUS_FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case GRPC_STATUS_ABORTED: + return "ABORTED"; + case GRPC_STATUS_OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case GRPC_STATUS_UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case GRPC_STATUS_INTERNAL: + return "INTERNAL"; + case GRPC_STATUS_UNAVAILABLE: + return "UNAVAILABLE"; + case GRPC_STATUS_DATA_LOSS: + return "DATA_LOSS"; + default: + return "UNKNOWN"; + } +} diff --git a/src/core/ext/filters/client_channel/status_util.h b/src/core/ext/filters/client_channel/status_util.h new file mode 100644 index 0000000000..e018709730 --- /dev/null +++ b/src/core/ext/filters/client_channel/status_util.h @@ -0,0 +1,58 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H + +#include <grpc/support/port_platform.h> + +#include <grpc/status.h> + +#include <stdbool.h> +#include <string.h> + +/// If \a status_str is a valid status string, sets \a status to the +/// corresponding status value and returns true. +bool grpc_status_code_from_string(const char* status_str, + grpc_status_code* status); + +/// Returns the string form of \a status, or "UNKNOWN" if invalid. +const char* grpc_status_code_to_string(grpc_status_code status); + +namespace grpc_core { +namespace internal { + +/// A set of grpc_status_code values. +class StatusCodeSet { + public: + bool Empty() const { return status_code_mask_ == 0; } + + void Add(grpc_status_code status) { status_code_mask_ |= (1 << status); } + + bool Contains(grpc_status_code status) const { + return status_code_mask_ & (1 << status); + } + + private: + int status_code_mask_ = 0; // A bitfield of status codes in the set. +}; + +} // namespace internal +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 1304b4a6ad..cae7cc35e3 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -659,7 +659,6 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { static void subchannel_call_destroy(void* call, grpc_error* error) { GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); grpc_subchannel_call* c = static_cast<grpc_subchannel_call*>(call); - GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); @@ -673,9 +672,10 @@ void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, call->schedule_closure_after_destroy = closure; } -void grpc_subchannel_call_ref( +grpc_subchannel_call* grpc_subchannel_call_ref( grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); + return c; } void grpc_subchannel_call_unref( @@ -705,6 +705,13 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } +void* grpc_connected_subchannel_call_get_parent_data( + grpc_subchannel_call* subchannel_call) { + grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack(); + return (char*)subchannel_call + sizeof(grpc_subchannel_call) + + chanstk->call_stack_size; +} + grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -776,8 +783,8 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate, grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, grpc_subchannel_call** call) { *call = static_cast<grpc_subchannel_call*>(gpr_arena_alloc( - args.arena, - sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size)); + args.arena, sizeof(grpc_subchannel_call) + + channel_stack_->call_stack_size + args.parent_data_size)); grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); RefCountedPtr<ConnectedSubchannel> connection = Ref(DEBUG_LOCATION, "subchannel_call"); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 7f997d9924..e23aec12df 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -81,6 +81,7 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { gpr_arena* arena; grpc_call_context_element* context; grpc_call_combiner* call_combiner; + size_t parent_data_size; }; explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); @@ -109,11 +110,17 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref( +grpc_subchannel_call* grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +/** Returns a pointer to the parent data associated with \a subchannel_call. + The data will be of the size specified in \a parent_data_size + field of the args passed to \a grpc_connected_subchannel_create_call(). */ +void* grpc_connected_subchannel_call_get_parent_data( + grpc_subchannel_call* subchannel_call); + /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* channel, grpc_error** error); diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index acb1d66fa8..1fe8288bd0 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -370,6 +370,9 @@ static void channel_connectivity_changed(void* arg, grpc_error* error) { max_idle_timer, and prevent max_idle_timer from being started in the future. */ increase_call_count(chand); + if (gpr_atm_acq_load(&chand->idle_state) == MAX_IDLE_STATE_SEEN_EXIT_IDLE) { + grpc_timer_cancel(&chand->max_idle_timer); + } } } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 63a9e566d3..b1b14dde02 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -29,6 +29,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/service_config.h" @@ -37,27 +39,29 @@ typedef struct { int max_recv_size; } message_size_limits; -typedef struct { - gpr_refcount refs; - message_size_limits limits; -} refcounted_message_size_limits; +namespace grpc_core { +namespace { -static void* refcounted_message_size_limits_ref(void* value) { - refcounted_message_size_limits* limits = - static_cast<refcounted_message_size_limits*>(value); - gpr_ref(&limits->refs); - return value; -} +class MessageSizeLimits : public RefCounted<MessageSizeLimits> { + public: + static RefCountedPtr<MessageSizeLimits> CreateFromJson(const grpc_json* json); -static void refcounted_message_size_limits_unref(void* value) { - refcounted_message_size_limits* limits = - static_cast<refcounted_message_size_limits*>(value); - if (gpr_unref(&limits->refs)) { - gpr_free(value); + const message_size_limits& limits() const { return limits_; } + + private: + // So New() can call our private ctor. + template <typename T, typename... Args> + friend T* grpc_core::New(Args&&... args); + + MessageSizeLimits(int max_send_size, int max_recv_size) { + limits_.max_send_size = max_send_size; + limits_.max_recv_size = max_recv_size; } -} -static void* refcounted_message_size_limits_create_from_json( + message_size_limits limits_; +}; + +RefCountedPtr<MessageSizeLimits> MessageSizeLimits::CreateFromJson( const grpc_json* json) { int max_request_message_bytes = -1; int max_response_message_bytes = -1; @@ -79,16 +83,15 @@ static void* refcounted_message_size_limits_create_from_json( if (max_response_message_bytes == -1) return nullptr; } } - refcounted_message_size_limits* value = - static_cast<refcounted_message_size_limits*>( - gpr_malloc(sizeof(refcounted_message_size_limits))); - gpr_ref_init(&value->refs, 1); - value->limits.max_send_size = max_request_message_bytes; - value->limits.max_recv_size = max_response_message_bytes; - return value; + return MakeRefCounted<MessageSizeLimits>(max_request_message_bytes, + max_response_message_bytes); } +} // namespace +} // namespace grpc_core + namespace { + struct call_data { grpc_call_combiner* call_combiner; message_size_limits limits; @@ -105,8 +108,11 @@ struct call_data { struct channel_data { message_size_limits limits; // Maps path names to refcounted_message_size_limits structs. - grpc_slice_hash_table* method_limit_table; + grpc_core::RefCountedPtr<grpc_core::SliceHashTable< + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> + method_limit_table; }; + } // namespace // Callback invoked when we receive a message. Here we check the max @@ -185,20 +191,19 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // size to the receive limit. calld->limits = chand->limits; if (chand->method_limit_table != nullptr) { - refcounted_message_size_limits* limits = - static_cast<refcounted_message_size_limits*>( - grpc_method_config_table_get(chand->method_limit_table, - args->path)); + grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = + grpc_core::ServiceConfig::MethodConfigTableLookup( + *chand->method_limit_table, args->path); if (limits != nullptr) { - if (limits->limits.max_send_size >= 0 && - (limits->limits.max_send_size < calld->limits.max_send_size || + if (limits->limits().max_send_size >= 0 && + (limits->limits().max_send_size < calld->limits.max_send_size || calld->limits.max_send_size < 0)) { - calld->limits.max_send_size = limits->limits.max_send_size; + calld->limits.max_send_size = limits->limits().max_send_size; } - if (limits->limits.max_recv_size >= 0 && - (limits->limits.max_recv_size < calld->limits.max_recv_size || + if (limits->limits().max_recv_size >= 0 && + (limits->limits().max_recv_size < calld->limits.max_recv_size || calld->limits.max_recv_size < 0)) { - calld->limits.max_recv_size = limits->limits.max_recv_size; + calld->limits.max_recv_size = limits->limits().max_recv_size; } } } @@ -253,15 +258,11 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); const char* service_config_str = grpc_channel_arg_get_string(channel_arg); if (service_config_str != nullptr) { - grpc_service_config* service_config = - grpc_service_config_create(service_config_str); + grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = + grpc_core::ServiceConfig::Create(service_config_str); if (service_config != nullptr) { - chand->method_limit_table = - grpc_service_config_create_method_config_table( - service_config, refcounted_message_size_limits_create_from_json, - refcounted_message_size_limits_ref, - refcounted_message_size_limits_unref); - grpc_service_config_destroy(service_config); + chand->method_limit_table = service_config->CreateMethodConfigTable( + grpc_core::MessageSizeLimits::CreateFromJson); } } return GRPC_ERROR_NONE; @@ -270,7 +271,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, // Destructor for channel_data. static void destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - grpc_slice_hash_table_unref(chand->method_limit_table); + chand->method_limit_table.reset(); } const grpc_channel_filter grpc_message_size_filter = { diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index dcfcd243a9..a82009ff69 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -30,10 +30,11 @@ #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/security_connector/security_connector.h" -#include "src/core/lib/security/transport/lb_targets_info.h" +#include "src/core/lib/security/transport/target_authority_table.h" #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" @@ -73,11 +74,11 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( const char* server_uri_path; server_uri_path = server_uri->path[0] == '/' ? server_uri->path + 1 : server_uri->path; - const grpc_slice_hash_table* targets_info = - grpc_lb_targets_info_find_in_args(args->args); - char* target_name_to_check = nullptr; - if (targets_info != nullptr) { // LB channel - // Find the balancer name for the target. + const grpc_core::TargetAuthorityTable* target_authority_table = + grpc_core::FindTargetAuthorityTableInArgs(args->args); + grpc_core::UniquePtr<char> authority; + if (target_authority_table != nullptr) { + // Find the authority for the target. const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri* target_uri = @@ -86,37 +87,33 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( target_uri->path[0] == '/' ? target_uri->path + 1 : target_uri->path); - const char* value = static_cast<const char*>( - grpc_slice_hash_table_get(targets_info, key)); - if (value != nullptr) target_name_to_check = gpr_strdup(value); + const grpc_core::UniquePtr<char>* value = + target_authority_table->Get(key); + if (value != nullptr) authority.reset(gpr_strdup(value->get())); grpc_slice_unref_internal(key); } - if (target_name_to_check == nullptr) { - // If the target name to check hasn't already been set, fall back to using - // SERVER_URI - target_name_to_check = gpr_strdup(server_uri_path); - } grpc_uri_destroy(target_uri); - } else { // regular channel: the secure name is the original server URI. - target_name_to_check = gpr_strdup(server_uri_path); + } + // If the authority hasn't already been set (either because no target + // authority table was present or because the target was not present + // in the table), fall back to using the original server URI. + if (authority == nullptr) { + authority.reset(gpr_strdup(server_uri_path)); } grpc_uri_destroy(server_uri); - GPR_ASSERT(target_name_to_check != nullptr); grpc_channel_security_connector* subchannel_security_connector = nullptr; // Create the security connector using the credentials and target name. grpc_channel_args* new_args_from_connector = nullptr; const grpc_security_status security_status = grpc_channel_credentials_create_security_connector( - channel_credentials, target_name_to_check, args->args, + channel_credentials, authority.get(), args->args, &subchannel_security_connector, &new_args_from_connector); if (security_status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Failed to create secure subchannel for secure name '%s'", - target_name_to_check); - gpr_free(target_name_to_check); + authority.get()); return nullptr; } - gpr_free(target_name_to_check); grpc_arg new_security_connector_arg = grpc_security_connector_to_arg(&subchannel_security_connector->base); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 89115b66ed..df3fb8c68c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1456,8 +1456,10 @@ static void perform_stream_op_locked(void* stream_op, } } if (op_payload->send_initial_metadata.peer_string != nullptr) { - gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); + char* old_peer_string = (char*)gpr_atm_full_xchg( + op_payload->send_initial_metadata.peer_string, + (gpr_atm)gpr_strdup(t->peer_string)); + gpr_free(old_peer_string); } } @@ -1571,8 +1573,10 @@ static void perform_stream_op_locked(void* stream_op, s->trailing_metadata_available = op_payload->recv_initial_metadata.trailing_metadata_available; if (op_payload->recv_initial_metadata.peer_string != nullptr) { - gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, - (gpr_atm)gpr_strdup(t->peer_string)); + char* old_peer_string = (char*)gpr_atm_full_xchg( + op_payload->recv_initial_metadata.peer_string, + (gpr_atm)gpr_strdup(t->peer_string)); + gpr_free(old_peer_string); } grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc index 58d77b932f..4d7dfd900f 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc @@ -69,6 +69,5 @@ void grpc_chttp2_incoming_metadata_buffer_set_deadline( void grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer* buffer, grpc_metadata_batch* batch) { - *batch = buffer->batch; - grpc_metadata_batch_init(&buffer->batch); + grpc_metadata_batch_move(&buffer->batch, batch); } |