/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/filters/client_channel/client_channel.h" #include #include #include #include #include #include #include #include #include #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/request_routing.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" using grpc_core::internal::ClientChannelMethodParams; using grpc_core::internal::ClientChannelMethodParamsTable; using grpc_core::internal::ProcessedResolverResult; using grpc_core::internal::ServerRetryThrottleData; /* Client channel implementation */ // By default, we buffer 256 KiB per RPC for retries. // TODO(roth): Do we have any data to suggest a better value? #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) // This value was picked arbitrarily. It can be changed if there is // any even moderately compelling reason to do so. #define RETRY_BACKOFF_JITTER 0.2 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ struct external_connectivity_watcher; typedef struct client_channel_channel_data { grpc_core::ManualConstructor request_router; bool deadline_checking_enabled; bool enable_retries; size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; /** retry throttle data */ grpc_core::RefCountedPtr retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr method_params_table; /** owning stack */ grpc_channel_stack* owning_stack; /** interested parties (owned) */ grpc_pollset_set* interested_parties; /* external_connectivity_watcher_list head is guarded by its own mutex, since * counts need to be grabbed immediately without polling on a cq */ gpr_mu external_connectivity_watcher_list_mu; struct external_connectivity_watcher* external_connectivity_watcher_list_head; /* the following properties are guarded by a mutex since APIs require them to be instantaneously available */ gpr_mu info_mu; grpc_core::UniquePtr info_lb_policy_name; /** service config in JSON form */ grpc_core::UniquePtr info_service_config_json; } channel_data; // Synchronous callback from chand->request_router to process a resolver // result update. static bool process_resolver_result_locked(void* arg, const grpc_channel_args& args, const char** lb_policy_name, grpc_json** lb_policy_config) { channel_data* chand = static_cast(arg); ProcessedResolverResult resolver_result(args, chand->enable_retries); grpc_core::UniquePtr service_config_json = resolver_result.service_config_json(); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", chand, service_config_json.get()); } // Update channel state. chand->retry_throttle_data = resolver_result.retry_throttle_data(); chand->method_params_table = resolver_result.method_params_table(); // Swap out the data used by cc_get_channel_info(). gpr_mu_lock(&chand->info_mu); chand->info_lb_policy_name = resolver_result.lb_policy_name(); const bool service_config_changed = ((service_config_json == nullptr) != (chand->info_service_config_json == nullptr)) || (service_config_json != nullptr && strcmp(service_config_json.get(), chand->info_service_config_json.get()) != 0); chand->info_service_config_json = std::move(service_config_json); gpr_mu_unlock(&chand->info_mu); // Return results. *lb_policy_name = chand->info_lb_policy_name.get(); *lb_policy_config = resolver_result.lb_policy_config(); return service_config_changed; } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { grpc_transport_op* op = static_cast(arg); grpc_channel_element* elem = static_cast(op->handler_private.extra_arg); channel_data* chand = static_cast(elem->channel_data); if (op->on_connectivity_state_change != nullptr) { chand->request_router->NotifyOnConnectivityStateChange( op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (chand->request_router->lb_policy() == nullptr) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); } else { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; // Pick must return synchronously, because pick_state.on_complete is null. GPR_ASSERT( chand->request_router->lb_policy()->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); } else { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "LB policy dropped call on ping"); } GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); } op->bind_pollset = nullptr; } op->send_ping.on_initiate = nullptr; op->send_ping.on_ack = nullptr; } if (op->disconnect_with_error != GRPC_ERROR_NONE) { chand->request_router->ShutdownLocked(op->disconnect_with_error); } if (op->reset_connect_backoff) { chand->request_router->ResetConnectionBackoffLocked(); } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } static void cc_start_transport_op(grpc_channel_element* elem, grpc_transport_op* op) { channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != nullptr) { grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); } op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_channel_element* elem, const grpc_channel_info* info) { channel_data* chand = static_cast(elem->channel_data); gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != nullptr) { *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); } if (info->service_config_json != nullptr) { *info->service_config_json = gpr_strdup(chand->info_service_config_json.get()); } gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); chand->external_connectivity_watcher_list_head = nullptr; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; chand->deadline_checking_enabled = grpc_deadline_checking_enabled(args->channel_args); chand->interested_parties = grpc_pollset_set_create(); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. const grpc_arg* arg = grpc_channel_args_find( args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); // Record enable_retries. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); chand->enable_retries = grpc_channel_arg_get_bool(arg, true); // Record client channel factory. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); if (arg == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Missing client channel factory in args for client channel filter"); } if (arg->type != GRPC_ARG_POINTER) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } grpc_client_channel_factory* client_channel_factory = static_cast(arg->value.pointer.p); // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); if (arg == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Missing server uri in args for client channel filter"); } if (arg->type != GRPC_ARG_STRING) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "server uri arg must be a string"); } char* proxy_name = nullptr; grpc_channel_args* new_args = nullptr; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); // Instantiate request router. grpc_client_channel_factory_ref(client_channel_factory); grpc_error* error = GRPC_ERROR_NONE; chand->request_router.Init( chand->owning_stack, chand->combiner, client_channel_factory, chand->interested_parties, &grpc_client_channel_trace, process_resolver_result_locked, chand, proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */, new_args != nullptr ? new_args : args->channel_args, &error); gpr_free(proxy_name); grpc_channel_args_destroy(new_args); return error; } /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); chand->request_router.Destroy(); // TODO(roth): Once we convert the filter API to C++, there will no // longer be any need to explicitly reset these smart pointer data members. chand->info_lb_policy_name.reset(); chand->info_service_config_json.reset(); chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* * PER-CALL FUNCTIONS */ // Max number of batches that can be pending on a call at any given // time. This includes one batch for each of the following ops: // recv_initial_metadata // send_initial_metadata // recv_message // send_message // recv_trailing_metadata // send_trailing_metadata #define MAX_PENDING_BATCHES 6 // Retry support: // // In order to support retries, we act as a proxy for stream op batches. // When we get a batch from the surface, we add it to our list of pending // batches, and we then use those batches to construct separate "child" // batches to be started on the subchannel call. When the child batches // return, we then decide which pending batches have been completed and // schedule their callbacks accordingly. If a subchannel call fails and // we want to retry it, we do a new pick and start again, constructing // new "child" batches for the new subchannel call. // // Note that retries are committed when receiving data from the server // (except for Trailers-Only responses). However, there may be many // send ops started before receiving any data, so we may have already // completed some number of send ops (and returned the completions up to // the surface) by the time we realize that we need to retry. To deal // with this, we cache data for send ops, so that we can replay them on a // different subchannel call even after we have completed the original // batches. // // There are two sets of data to maintain: // - In call_data (in the parent channel), we maintain a list of pending // ops and cached data for send ops. // - In the subchannel call, we maintain state to indicate what ops have // already been sent down to that call. // // When constructing the "child" batches, we compare those two sets of // data to see which batches need to be sent to the subchannel call. // TODO(roth): In subsequent PRs: // - add support for transparent retries (including initial metadata) // - figure out how to record stats in census for retries // (census filter is on top of this one) // - add census stats for retries namespace { struct call_data; // State used for starting a retryable batch on a subchannel call. // This provides its own grpc_transport_stream_op_batch and other data // structures needed to populate the ops in the batch. // We allocate one struct on the arena for each attempt at starting a // batch on a given subchannel call. struct subchannel_batch_data { subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete); // All dtor code must be added in `destroy`. This is because we may // call closures in `subchannel_batch_data` after they are unrefed by // `batch_data_unref`, and msan would complain about accessing this class // after calling dtor. As a result we cannot call the `dtor` in // `batch_data_unref`. // TODO(soheil): We should try to call the dtor in `batch_data_unref`. ~subchannel_batch_data() { destroy(); } void destroy(); gpr_refcount refs; grpc_call_element* elem; grpc_subchannel_call* subchannel_call; // Holds a ref. // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; // For intercepting on_complete. grpc_closure on_complete; }; // Retry state associated with a subchannel call. // Stored in the parent_data of the subchannel call object. struct subchannel_call_retry_state { explicit subchannel_call_retry_state(grpc_call_context_element* context) : batch_payload(context), started_send_initial_metadata(false), completed_send_initial_metadata(false), started_send_trailing_metadata(false), completed_send_trailing_metadata(false), started_recv_initial_metadata(false), completed_recv_initial_metadata(false), started_recv_trailing_metadata(false), completed_recv_trailing_metadata(false), retry_dispatched(false) {} // subchannel_batch_data.batch.payload points to this. grpc_transport_stream_op_batch_payload batch_payload; // For send_initial_metadata. // Note that we need to make a copy of the initial metadata for each // subchannel call instead of just referring to the copy in call_data, // because filters in the subchannel stack will probably add entries, // so we need to start in a pristine state for each attempt of the call. grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. grpc_core::ManualConstructor send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; // For intercepting recv_initial_metadata. grpc_metadata_batch recv_initial_metadata; grpc_closure recv_initial_metadata_ready; bool trailing_metadata_available = false; // For intercepting recv_message. grpc_closure recv_message_ready; grpc_core::OrphanablePtr recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; grpc_closure recv_trailing_metadata_ready; // These fields indicate which ops have been started and completed on // this subchannel call. size_t started_send_message_count = 0; size_t completed_send_message_count = 0; size_t started_recv_message_count = 0; size_t completed_recv_message_count = 0; bool started_send_initial_metadata : 1; bool completed_send_initial_metadata : 1; bool started_send_trailing_metadata : 1; bool completed_send_trailing_metadata : 1; bool started_recv_initial_metadata : 1; bool completed_recv_initial_metadata : 1; bool started_recv_trailing_metadata : 1; bool completed_recv_trailing_metadata : 1; // State for callback processing. subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; grpc_error* recv_message_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; // NOTE: Do not move this next to the metadata bitfields above. That would // save space but will also result in a data race because compiler will // generate a 2 byte store which overwrites the meta-data fields upon // setting this field. bool retry_dispatched : 1; }; // Pending batches stored in call data. struct pending_batch { // The pending batch. If nullptr, this slot is empty. grpc_transport_stream_op_batch* batch; // Indicates whether payload for send ops has been cached in call data. bool send_ops_cached; }; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting for initial metadata before trying to create a call object, and handling cancellation gracefully. */ struct call_data { call_data(grpc_call_element* elem, const channel_data& chand, const grpc_call_element_args& args) : deadline_state(elem, args.call_stack, args.call_combiner, GPR_LIKELY(chand.deadline_checking_enabled) ? args.deadline : GRPC_MILLIS_INF_FUTURE), path(grpc_slice_ref_internal(args.path)), call_start_time(args.start_time), deadline(args.deadline), arena(args.arena), owning_call(args.call_stack), call_combiner(args.call_combiner), pending_send_initial_metadata(false), pending_send_message(false), pending_send_trailing_metadata(false), enable_retries(chand.enable_retries), retry_committed(false), last_attempt_got_server_pushback(false) {} ~call_data() { if (GPR_LIKELY(subchannel_call != nullptr)) { GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel_destroy_call"); } grpc_slice_unref_internal(path); GRPC_ERROR_UNREF(cancel_error); for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { GPR_ASSERT(pending_batches[i].batch == nullptr); } if (have_request) { request.Destroy(); } } // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state // and this struct both independently store pointers to the call stack // and call combiner. If/when we have time, find a way to avoid this // without breaking the grpc_deadline_state abstraction. grpc_deadline_state deadline_state; grpc_slice path; // Request path. gpr_timespec call_start_time; grpc_millis deadline; gpr_arena* arena; grpc_call_stack* owning_call; grpc_call_combiner* call_combiner; grpc_core::RefCountedPtr retry_throttle_data; grpc_core::RefCountedPtr method_params; grpc_subchannel_call* subchannel_call = nullptr; // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; grpc_core::ManualConstructor request; bool have_request = false; grpc_closure pick_closure; grpc_polling_entity* pollent = nullptr; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have // passed the batch down to the subchannel call and are not // intercepting any of its callbacks). pending_batch pending_batches[MAX_PENDING_BATCHES] = {}; bool pending_send_initial_metadata : 1; bool pending_send_message : 1; bool pending_send_trailing_metadata : 1; // Retry state. bool enable_retries : 1; bool retry_committed : 1; bool last_attempt_got_server_pushback : 1; int num_attempts_completed = 0; size_t bytes_buffered_for_retry = 0; grpc_core::ManualConstructor retry_backoff; grpc_timer retry_timer; // The number of pending retriable subchannel batches containing send ops. // We hold a ref to the call stack while this is non-zero, since replay // batches may not complete until after all callbacks have been returned // to the surface, and we need to make sure that the call is not destroyed // until all of these batches have completed. // Note that we actually only need to track replay batches, but it's // easier to track all batches with send ops. int num_pending_retriable_subchannel_send_batches = 0; // Cached data for retrying send ops. // send_initial_metadata bool seen_send_initial_metadata = false; grpc_linked_mdelem* send_initial_metadata_storage = nullptr; grpc_metadata_batch send_initial_metadata; uint32_t send_initial_metadata_flags; gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream // with a CachingByteStream that caches the slices to a local buffer for // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. grpc_core::InlinedVector send_messages; // send_trailing_metadata bool seen_send_trailing_metadata = false; grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; grpc_metadata_batch send_trailing_metadata; }; } // namespace // Forward declarations. static void retry_commit(grpc_call_element* elem, subchannel_call_retry_state* retry_state); static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); // // send op data caching // // Caches data for send ops so that it can be retried later, if not // already cached. static void maybe_cache_send_ops_for_batch(call_data* calld, pending_batch* pending) { if (pending->send_ops_cached) return; pending->send_ops_cached = true; grpc_transport_stream_op_batch* batch = pending->batch; // Save a copy of metadata for send_initial_metadata ops. if (batch->send_initial_metadata) { calld->seen_send_initial_metadata = true; GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); grpc_metadata_batch* send_initial_metadata = batch->payload->send_initial_metadata.send_initial_metadata; calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); grpc_metadata_batch_copy(send_initial_metadata, &calld->send_initial_metadata, calld->send_initial_metadata_storage); calld->send_initial_metadata_flags = batch->payload->send_initial_metadata.send_initial_metadata_flags; calld->peer_string = batch->payload->send_initial_metadata.peer_string; } // Set up cache for send_message ops. if (batch->send_message) { grpc_core::ByteStreamCache* cache = static_cast( gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); new (cache) grpc_core::ByteStreamCache( std::move(batch->payload->send_message.send_message)); calld->send_messages.push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { calld->seen_send_trailing_metadata = true; GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); grpc_metadata_batch* send_trailing_metadata = batch->payload->send_trailing_metadata.send_trailing_metadata; calld->send_trailing_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); grpc_metadata_batch_copy(send_trailing_metadata, &calld->send_trailing_metadata, calld->send_trailing_metadata_storage); } } // Frees cached send_initial_metadata. static void free_cached_send_initial_metadata(channel_data* chand, call_data* calld) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, calld); } grpc_metadata_batch_destroy(&calld->send_initial_metadata); } // Frees cached send_message at index idx. static void free_cached_send_message(channel_data* chand, call_data* calld, size_t idx) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", chand, calld, idx); } calld->send_messages[idx]->Destroy(); } // Frees cached send_trailing_metadata. static void free_cached_send_trailing_metadata(channel_data* chand, call_data* calld) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_trailing_metadata", chand, calld); } grpc_metadata_batch_destroy(&calld->send_trailing_metadata); } // Frees cached send ops that have already been completed after // committing the call. static void free_cached_send_op_data_after_commit( grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (retry_state->completed_send_initial_metadata) { free_cached_send_initial_metadata(chand, calld); } for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { free_cached_send_message(chand, calld, i); } if (retry_state->completed_send_trailing_metadata) { free_cached_send_trailing_metadata(chand, calld); } } // Frees cached send ops that were completed by the completed batch in // batch_data. Used when batches are completed after the call is committed. static void free_cached_send_op_data_for_completed_batch( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (batch_data->batch.send_initial_metadata) { free_cached_send_initial_metadata(chand, calld); } if (batch_data->batch.send_message) { free_cached_send_message(chand, calld, retry_state->completed_send_message_count - 1); } if (batch_data->batch.send_trailing_metadata) { free_cached_send_trailing_metadata(chand, calld); } } // // pending_batches management // // Returns the index into calld->pending_batches to be used for batch. static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in pick_subchannel_locked() assumes it will be. if (batch->send_initial_metadata) return 0; if (batch->send_message) return 1; if (batch->send_trailing_metadata) return 2; if (batch->recv_initial_metadata) return 3; if (batch->recv_message) return 4; if (batch->recv_trailing_metadata) return 5; GPR_UNREACHABLE_CODE(return (size_t)-1); } // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_add(grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); const size_t idx = get_batch_index(batch); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, calld, idx); } pending_batch* pending = &calld->pending_batches[idx]; GPR_ASSERT(pending->batch == nullptr); pending->batch = batch; pending->send_ops_cached = false; if (calld->enable_retries) { // Update state in calld about pending batches. // Also check if the batch takes us over the retry buffer limit. // Note: We don't check the size of trailing metadata here, because // gRPC clients do not send trailing metadata. if (batch->send_initial_metadata) { calld->pending_send_initial_metadata = true; calld->bytes_buffered_for_retry += grpc_metadata_batch_size( batch->payload->send_initial_metadata.send_initial_metadata); } if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; } if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded retry buffer size, committing", chand, calld); } subchannel_call_retry_state* retry_state = calld->subchannel_call == nullptr ? nullptr : static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); retry_commit(elem, retry_state); // If we are not going to retry and have not yet started, pretend // retries are disabled so that we don't bother with retry overhead. if (calld->num_attempts_completed == 0) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: disabling retries before first attempt", chand, calld); } calld->enable_retries = false; } } } } static void pending_batch_clear(call_data* calld, pending_batch* pending) { if (calld->enable_retries) { if (pending->batch->send_initial_metadata) { calld->pending_send_initial_metadata = false; } if (pending->batch->send_message) { calld->pending_send_message = false; } if (pending->batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = false; } } pending->batch = nullptr; } // This is called via the call combiner, so access to calld is synchronized. static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); call_data* calld = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(error), calld->call_combiner); } // This is called via the call combiner, so access to calld is synchronized. // If yield_call_combiner is true, assumes responsibility for yielding // the call combiner. static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, bool yield_call_combiner) { GPR_ASSERT(error != GRPC_ERROR_NONE); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } gpr_log(GPR_INFO, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { batch->handler_private.extra_arg = calld; GRPC_CLOSURE_INIT(&batch->handler_private.closure, fail_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), "pending_batches_fail"); pending_batch_clear(calld, pending); } } if (yield_call_combiner) { closures.RunClosures(calld->call_combiner); } else { closures.RunClosuresWithoutYielding(calld->call_combiner); } GRPC_ERROR_UNREF(error); } // This is called via the call combiner, so access to calld is synchronized. static void resume_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); grpc_subchannel_call* subchannel_call = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_subchannel_call_process_op(subchannel_call, batch); } // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (calld->enable_retries) { start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); return; } // Retries not enabled; send down batches as-is. if (grpc_client_channel_trace.enabled()) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "pending_batches_resume"); pending_batch_clear(calld, pending); } } // Note: This will release the call combiner. closures.RunClosures(calld->call_combiner); } static void maybe_clear_pending_batch(grpc_call_element* elem, pending_batch* pending) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); grpc_transport_stream_op_batch* batch = pending->batch; // We clear the pending batch if all of its callbacks have been // scheduled and reset to nullptr. if (batch->on_complete == nullptr && (!batch->recv_initial_metadata || batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || batch->payload->recv_message.recv_message_ready == nullptr) && (!batch->recv_trailing_metadata || batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); } pending_batch_clear(calld, pending); } } // Returns a pointer to the first pending batch for which predicate(batch) // returns true, or null if not found. template static pending_batch* pending_batch_find(grpc_call_element* elem, const char* log_message, Predicate predicate) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr && predicate(batch)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, calld, log_message, i); } return pending; } } return nullptr; } // // retry code // // Commits the call so that no further retry attempts will be performed. static void retry_commit(grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (calld->retry_committed) return; calld->retry_committed = true; if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); } if (retry_state != nullptr) { free_cached_send_op_data_after_commit(elem, retry_state); } } // Starts a retry after appropriate back-off. static void do_retry(grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_millis server_pushback_ms) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); GPR_ASSERT(calld->method_params != nullptr); const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); GPR_ASSERT(retry_policy != nullptr); // Reset subchannel call and connected subchannel. if (calld->subchannel_call != nullptr) { GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_call_retry"); calld->subchannel_call = nullptr; } if (calld->have_request) { calld->have_request = false; calld->request.Destroy(); } // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; calld->last_attempt_got_server_pushback = true; } else { if (calld->num_attempts_completed == 1 || calld->last_attempt_got_server_pushback) { calld->retry_backoff.Init( grpc_core::BackOff::Options() .set_initial_backoff(retry_policy->initial_backoff) .set_multiplier(retry_policy->backoff_multiplier) .set_jitter(RETRY_BACKOFF_JITTER) .set_max_backoff(retry_policy->max_backoff)); calld->last_attempt_got_server_pushback = false; } next_attempt_time = calld->retry_backoff->NextAttemptTime(); } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand, calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); } // Schedule retry after computed delay. GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, grpc_combiner_scheduler(chand->combiner)); grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); // Update bookkeeping. if (retry_state != nullptr) retry_state->retry_dispatched = true; } // Returns true if the call is being retried. static bool maybe_retry(grpc_call_element* elem, subchannel_batch_data* batch_data, grpc_status_code status, grpc_mdelem* server_pushback_md) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); // Get retry policy. if (calld->method_params == nullptr) return false; const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); if (retry_policy == nullptr) return false; // If we've already dispatched a retry from this call, return true. // This catches the case where the batch has multiple callbacks // (i.e., it includes either recv_message or recv_initial_metadata). subchannel_call_retry_state* retry_state = nullptr; if (batch_data != nullptr) { retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); if (retry_state->retry_dispatched) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, calld); } return true; } } // Check status. if (GPR_LIKELY(status == GRPC_STATUS_OK)) { if (calld->retry_throttle_data != nullptr) { calld->retry_throttle_data->RecordSuccess(); } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); } return false; } // Status is not OK. Check whether the status is retryable. if (!retry_policy->retryable_status_codes.Contains(status)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: status %s not configured as retryable", chand, calld, grpc_status_code_to_string(status)); } return false; } // Record the failure and check whether retries are throttled. // Note that it's important for this check to come after the status // code check above, since we should only record failures whose statuses // match the configured retryable status codes, so that we don't count // things like failures due to malformed requests (INVALID_ARGUMENT). // Conversely, it's important for this to come before the remaining // checks, so that we don't fail to record failures due to other factors. if (calld->retry_throttle_data != nullptr && !calld->retry_throttle_data->RecordFailure()) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); } return false; } // Check whether the call is committed. if (calld->retry_committed) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, calld); } return false; } // Check whether we have retries remaining. ++calld->num_attempts_completed; if (calld->num_attempts_completed >= retry_policy->max_attempts) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, calld, retry_policy->max_attempts); } return false; } // If the call was cancelled from the surface, don't retry. if (calld->cancel_error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call cancelled from surface, not retrying", chand, calld); } return false; } // Check server push-back. grpc_millis server_pushback_ms = -1; if (server_pushback_md != nullptr) { // If the value is "-1" or any other unparseable string, we do not retry. uint32_t ms; if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: not retrying due to server push-back", chand, calld); } return false; } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", chand, calld, ms); } server_pushback_ms = (grpc_millis)ms; } } do_retry(elem, retry_state, server_pushback_ms); return true; } // // subchannel_batch_data // namespace { subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) : elem(elem), subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create")) { subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); batch.payload = &retry_state->batch_payload; gpr_ref_init(&refs, refcount); if (set_on_complete) { GRPC_CLOSURE_INIT(&on_complete, ::on_complete, this, grpc_schedule_on_exec_ctx); batch.on_complete = &on_complete; } GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); } void subchannel_batch_data::destroy() { subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data(subchannel_call)); if (batch.send_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } if (batch.send_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); } if (batch.recv_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); } if (batch.recv_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); call_data* calld = static_cast(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } } // namespace // Creates a subchannel_batch_data object on the call's arena with the // specified refcount. If set_on_complete is true, the batch's // on_complete callback will be set to point to on_complete(); // otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, int refcount, bool set_on_complete) { call_data* calld = static_cast(elem->call_data); subchannel_batch_data* batch_data = new (gpr_arena_alloc(calld->arena, sizeof(*batch_data))) subchannel_batch_data(elem, calld, refcount, set_on_complete); return batch_data; } static void batch_data_unref(subchannel_batch_data* batch_data) { if (gpr_unref(&batch_data->refs)) { batch_data->destroy(); } } // // recv_initial_metadata callback handling // // Invokes recv_initial_metadata_ready for a subchannel batch. static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); // Find pending batch. pending_batch* pending = pending_batch_find( batch_data->elem, "invoking recv_initial_metadata_ready for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_initial_metadata && batch->payload->recv_initial_metadata .recv_initial_metadata_ready != nullptr; }); GPR_ASSERT(pending != nullptr); // Return metadata. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. grpc_closure* recv_initial_metadata_ready = pending->batch->payload->recv_initial_metadata .recv_initial_metadata_ready; pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = nullptr; maybe_clear_pending_batch(batch_data->elem, pending); batch_data_unref(batch_data); // Invoke callback. GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } // Intercepts recv_initial_metadata_ready callback for retries. // Commits the call and returns the initial metadata up the stack. static void recv_initial_metadata_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); retry_state->completed_recv_initial_metadata = true; // If a retry was already dispatched, then we're not going to use the // result of this recv_initial_metadata op, so do nothing. if (retry_state->retry_dispatched) { GRPC_CALL_COMBINER_STOP( calld->call_combiner, "recv_initial_metadata_ready after retry dispatched"); return; } // If we got an error or a Trailers-Only response and have not yet gotten // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY((retry_state->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_initial_metadata_ready " "(Trailers-Only)", chand, calld); } retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it // ourselves to get status. start_internal_recv_trailing_metadata(elem); } else { GRPC_CALL_COMBINER_STOP( calld->call_combiner, "recv_initial_metadata_ready trailers-only or error"); } return; } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); } // // recv_message callback handling // // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); // Find pending op. pending_batch* pending = pending_batch_find( batch_data->elem, "invoking recv_message_ready for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_message && batch->payload->recv_message.recv_message_ready != nullptr; }); GPR_ASSERT(pending != nullptr); // Return payload. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); *pending->batch->payload->recv_message.recv_message = std::move(retry_state->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. grpc_closure* recv_message_ready = pending->batch->payload->recv_message.recv_message_ready; pending->batch->payload->recv_message.recv_message_ready = nullptr; maybe_clear_pending_batch(batch_data->elem, pending); batch_data_unref(batch_data); // Invoke callback. GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); } // Intercepts recv_message_ready callback for retries. // Commits the call and returns the message up the stack. static void recv_message_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); ++retry_state->completed_recv_message_count; // If a retry was already dispatched, then we're not going to use the // result of this recv_message op, so do nothing. if (retry_state->retry_dispatched) { GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready after retry dispatched"); return; } // If we got an error or the payload was nullptr and we have not yet gotten // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY( (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_message_ready (nullptr " "message and recv_trailing_metadata pending)", chand, calld); } retry_state->recv_message_ready_deferred_batch = batch_data; retry_state->recv_message_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it // ourselves to get status. start_internal_recv_trailing_metadata(elem); } else { GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); } return; } // Received a valid message, so commit the call. retry_commit(elem, retry_state); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); } // // recv_trailing_metadata handling // // Sets *status and *server_pushback_md based on md_batch and error. // Only sets *server_pushback_md if server_pushback_md != nullptr. static void get_call_status(grpc_call_element* elem, grpc_metadata_batch* md_batch, grpc_error* error, grpc_status_code* status, grpc_mdelem** server_pushback_md) { call_data* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); if (server_pushback_md != nullptr && md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } GRPC_ERROR_UNREF(error); } // Adds recv_trailing_metadata_ready closure to closures. static void add_closure_for_recv_trailing_metadata_ready( grpc_call_element* elem, subchannel_batch_data* batch_data, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { // Find pending batch. pending_batch* pending = pending_batch_find( elem, "invoking recv_trailing_metadata for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_trailing_metadata && batch->payload->recv_trailing_metadata .recv_trailing_metadata_ready != nullptr; }); // If we generated the recv_trailing_metadata op internally via // start_internal_recv_trailing_metadata(), then there will be no // pending batch. if (pending == nullptr) { GRPC_ERROR_UNREF(error); return; } // Return metadata. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); // Add closure. closures->Add(pending->batch->payload->recv_trailing_metadata .recv_trailing_metadata_ready, error, "recv_trailing_metadata_ready for pending batch"); // Update bookkeeping. pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = nullptr; maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and // recv_message callbacks to closures. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, invoke_recv_initial_metadata_callback, retry_state->recv_initial_metadata_ready_deferred_batch, grpc_schedule_on_exec_ctx); closures->Add(&retry_state->recv_initial_metadata_ready, retry_state->recv_initial_metadata_error, "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, invoke_recv_message_callback, retry_state->recv_message_ready_deferred_batch, grpc_schedule_on_exec_ctx); closures->Add(&retry_state->recv_message_ready, retry_state->recv_message_error, "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; } } } // Returns true if any op in the batch was not yet started. // Only looks at send ops, since recv ops are always started immediately. static bool pending_batch_is_unstarted( pending_batch* pending, call_data* calld, subchannel_call_retry_state* retry_state) { if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { return false; } if (pending->batch->send_initial_metadata && !retry_state->started_send_initial_metadata) { return true; } if (pending->batch->send_message && retry_state->started_send_message_count < calld->send_messages.size()) { return true; } if (pending->batch->send_trailing_metadata && !retry_state->started_send_trailing_metadata) { return true; } return false; } // For any pending batch containing an op that has not yet been started, // adds the pending batch's completion closures to closures. static void add_closures_to_fail_unstarted_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; if (pending_batch_is_unstarted(pending, calld, retry_state)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failing unstarted pending batch at index " "%" PRIuPTR, chand, calld, i); } closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), "failing on_complete for pending batch"); pending->batch->on_complete = nullptr; maybe_clear_pending_batch(elem, pending); } } GRPC_ERROR_UNREF(error); } // Runs necessary closures upon completion of a call attempt. static void run_closures_for_completed_call(subchannel_batch_data* batch_data, grpc_error* error) { grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. add_closure_for_recv_trailing_metadata_ready( elem, batch_data, GRPC_ERROR_REF(error), &closures); // If there are deferred recv_initial_metadata_ready or recv_message_ready // callbacks, add them to closures. add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); // Add closures to fail any pending batches that have not yet been started. add_closures_to_fail_unstarted_pending_batches( elem, retry_state, GRPC_ERROR_REF(error), &closures); // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This will release the call combiner. closures.RunClosures(calld->call_combiner); GRPC_ERROR_UNREF(error); } // Intercepts recv_trailing_metadata_ready callback for retries. // Commits the call and returns the trailing metadata up the stack. static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); retry_state->completed_recv_trailing_metadata = true; // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; grpc_metadata_batch* md_batch = batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); } // Check if we should retry. if (maybe_retry(elem, batch_data, status, server_pushback_md)) { // Unref batch_data for deferred recv_initial_metadata_ready or // recv_message_ready callbacks, if any. if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { batch_data_unref(batch_data); GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); } if (retry_state->recv_message_ready_deferred_batch != nullptr) { batch_data_unref(batch_data); GRPC_ERROR_UNREF(retry_state->recv_message_error); } batch_data_unref(batch_data); return; } // Not retrying, so commit the call. retry_commit(elem, retry_state); // Run any necessary closures. run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); } // // on_complete callback handling // // Adds the on_complete closure for the pending batch completed in // batch_data to closures. static void add_closure_for_completed_pending_batch( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { pending_batch* pending = pending_batch_find( elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { // Match the pending batch with the same set of send ops as the // subchannel batch we've just completed. return batch->on_complete != nullptr && batch_data->batch.send_initial_metadata == batch->send_initial_metadata && batch_data->batch.send_message == batch->send_message && batch_data->batch.send_trailing_metadata == batch->send_trailing_metadata; }); // If batch_data is a replay batch, then there will be no pending // batch to complete. if (pending == nullptr) { GRPC_ERROR_UNREF(error); return; } // Add closure. closures->Add(pending->batch->on_complete, error, "on_complete for pending batch"); pending->batch->on_complete = nullptr; maybe_clear_pending_batch(elem, pending); } // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke // start_retriable_subchannel_batches(). static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); bool have_pending_send_message_ops = retry_state->started_send_message_count < calld->send_messages.size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; if (!have_pending_send_message_ops && !have_pending_send_trailing_metadata_op) { for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch == nullptr || pending->send_ops_cached) continue; if (batch->send_message) have_pending_send_message_ops = true; if (batch->send_trailing_metadata) { have_pending_send_trailing_metadata_op = true; } } } if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, "starting next batch for send_* op(s)"); } } // Callback used to intercept on_complete from subchannel calls. // Called only when retries are enabled. static void on_complete(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", chand, calld, grpc_error_string(error), batch_str); gpr_free(batch_str); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); // Update bookkeeping in retry_state. if (batch_data->batch.send_initial_metadata) { retry_state->completed_send_initial_metadata = true; } if (batch_data->batch.send_message) { ++retry_state->completed_send_message_count; } if (batch_data->batch.send_trailing_metadata) { retry_state->completed_send_trailing_metadata = true; } // If the call is committed, free cached data for send ops that we've just // completed. if (calld->retry_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // If a retry was already dispatched, that means we saw // recv_trailing_metadata before this, so we do nothing here. // Otherwise, invoke the callback to return the result to the surface. if (!retry_state->retry_dispatched) { // Add closure for the completed pending batch, if any. add_closure_for_completed_pending_batch(elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures); // If needed, add a callback to start any replay or pending send ops on // the subchannel call. if (!retry_state->completed_recv_trailing_metadata) { add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, &closures); } } // Track number of pending subchannel send batches and determine if this // was the last one. --calld->num_pending_retriable_subchannel_send_batches; const bool last_send_batch_complete = calld->num_pending_retriable_subchannel_send_batches == 0; // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. closures.RunClosures(calld->call_combiner); // If this was the last subchannel send batch, unref the call stack. if (last_send_batch_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } // // subchannel batch construction // // Helper function used to start a subchannel batch in the call combiner. static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); grpc_subchannel_call* subchannel_call = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_subchannel_call_process_op(subchannel_call, batch); } // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, calld, batch_str); gpr_free(batch_str); } closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. static void add_retriable_send_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { // Maps the number of retries to the corresponding metadata value slice. static const grpc_slice* retry_count_strings[] = { &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. // // If we've already completed one or more attempts, add the // grpc-retry-attempts header. retry_state->send_initial_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * (calld->send_initial_metadata.list.count + (calld->num_attempts_completed > 0)))); grpc_metadata_batch_copy(&calld->send_initial_metadata, &retry_state->send_initial_metadata, retry_state->send_initial_metadata_storage); if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts != nullptr)) { grpc_metadata_batch_remove(&retry_state->send_initial_metadata, retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts); } if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { grpc_mdelem retry_md = grpc_mdelem_create( GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, *retry_count_strings[calld->num_attempts_completed - 1], nullptr); grpc_error* error = grpc_metadata_batch_add_tail( &retry_state->send_initial_metadata, &retry_state->send_initial_metadata_storage[calld->send_initial_metadata .list.count], retry_md); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { gpr_log(GPR_ERROR, "error adding retry metadata: %s", grpc_error_string(error)); GPR_ASSERT(false); } } retry_state->started_send_initial_metadata = true; batch_data->batch.send_initial_metadata = true; batch_data->batch.payload->send_initial_metadata.send_initial_metadata = &retry_state->send_initial_metadata; batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = calld->send_initial_metadata_flags; batch_data->batch.payload->send_initial_metadata.peer_string = calld->peer_string; } // Adds retriable send_message op to batch_data. static void add_retriable_send_message_op( grpc_call_element* elem, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } grpc_core::ByteStreamCache* cache = calld->send_messages[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; retry_state->send_message.Init(cache); batch_data->batch.send_message = true; batch_data->batch.payload->send_message.send_message.reset( retry_state->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. static void add_retriable_send_trailing_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. retry_state->send_trailing_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * calld->send_trailing_metadata.list.count)); grpc_metadata_batch_copy(&calld->send_trailing_metadata, &retry_state->send_trailing_metadata, retry_state->send_trailing_metadata_storage); retry_state->started_send_trailing_metadata = true; batch_data->batch.send_trailing_metadata = true; batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = &retry_state->send_trailing_metadata; } // Adds retriable recv_initial_metadata op to batch_data. static void add_retriable_recv_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { retry_state->started_recv_initial_metadata = true; batch_data->batch.recv_initial_metadata = true; grpc_metadata_batch_init(&retry_state->recv_initial_metadata); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = &retry_state->recv_initial_metadata; batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = &retry_state->trailing_metadata_available; GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, recv_initial_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = &retry_state->recv_initial_metadata_ready; } // Adds retriable recv_message op to batch_data. static void add_retriable_recv_message_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { ++retry_state->started_recv_message_count; batch_data->batch.recv_message = true; batch_data->batch.payload->recv_message.recv_message = &retry_state->recv_message; GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_message.recv_message_ready = &retry_state->recv_message_ready; } // Adds retriable recv_trailing_metadata op to batch_data. static void add_retriable_recv_trailing_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { retry_state->started_recv_trailing_metadata = true; batch_data->batch.recv_trailing_metadata = true; grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &retry_state->recv_trailing_metadata; batch_data->batch.payload->recv_trailing_metadata.collect_stats = &retry_state->collect_stats; GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, recv_trailing_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_trailing_metadata .recv_trailing_metadata_ready = &retry_state->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This // is used in the case where a recv_initial_metadata or recv_message // op fails in a way that we know the call is over but when the application // has not yet started its own recv_trailing_metadata op. static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call failed but recv_trailing_metadata not " "started; starting it internally", chand, calld); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: // once for the recv_trailing_metadata_ready callback when the subchannel // batch returns, and again when we actually get a recv_trailing_metadata // op from the surface. subchannel_batch_data* batch_data = batch_data_create(elem, 2, false /* set_on_complete */); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); } // If there are any cached send ops that need to be replayed on the // current subchannel call, creates and returns a new subchannel batch // to replay those ops. Otherwise, returns nullptr. static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); subchannel_batch_data* replay_batch_data = nullptr; // send_initial_metadata. if (calld->seen_send_initial_metadata && !retry_state->started_send_initial_metadata && !calld->pending_send_initial_metadata) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_initial_metadata op", chand, calld); } replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } // send_message. // Note that we can only have one send_message op in flight at a time. if (retry_state->started_send_message_count < calld->send_messages.size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_message op", chand, calld); } if (replay_batch_data == nullptr) { replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } // send_trailing_metadata. // Note that we only add this op if we have no more send_message ops // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && retry_state->started_send_message_count == calld->send_messages.size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_trailing_metadata op", chand, calld); } if (replay_batch_data == nullptr) { replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); } return replay_batch_data; } // Adds subchannel batches for pending batches to batches, updating // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch == nullptr) continue; // Skip any batch that either (a) has already been started on this // subchannel call or (b) we can't start yet because we're still // replaying send ops that need to be completed first. // TODO(roth): Note that if any one op in the batch can't be sent // yet due to ops that we're replaying, we don't start any of the ops // in the batch. This is probably okay, but it could conceivably // lead to increased latency in some cases -- e.g., we could delay // starting a recv op due to it being in the same batch with a send // op. If/when we revamp the callback protocol in // transport_stream_op_batch, we may be able to fix this. if (batch->send_initial_metadata && retry_state->started_send_initial_metadata) { continue; } if (batch->send_message && retry_state->completed_send_message_count < retry_state->started_send_message_count) { continue; } // Note that we only start send_trailing_metadata if we have no more // send_message ops to start, since we can't send down any more // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < calld->send_messages.size() || retry_state->started_send_trailing_metadata)) { continue; } if (batch->recv_initial_metadata && retry_state->started_recv_initial_metadata) { continue; } if (batch->recv_message && retry_state->completed_recv_message_count < retry_state->started_recv_message_count) { continue; } if (batch->recv_trailing_metadata && retry_state->started_recv_trailing_metadata) { // If we previously completed a recv_trailing_metadata op // initiated by start_internal_recv_trailing_metadata(), use the // result of that instead of trying to re-start this op. if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != nullptr))) { // If the batch completed, then trigger the completion callback // directly, so that we return the previously returned results to // the application. Otherwise, just unref the internally // started subchannel batch, since we'll propagate the // completion when it completes. if (retry_state->completed_recv_trailing_metadata) { // Batches containing recv_trailing_metadata always succeed. closures->Add( &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, "re-executing recv_trailing_metadata_ready to propagate " "internally triggered result"); } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } retry_state->recv_trailing_metadata_internal_batch = nullptr; } continue; } // If we're not retrying, just send the batch as-is. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { add_closure_for_subchannel_batch(elem, batch, closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. const bool has_send_ops = batch->send_initial_metadata || batch->send_message || batch->send_trailing_metadata; const int num_callbacks = has_send_ops + batch->recv_initial_metadata + batch->recv_message + batch->recv_trailing_metadata; subchannel_batch_data* batch_data = batch_data_create( elem, num_callbacks, has_send_ops /* set_on_complete */); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. if (batch->send_initial_metadata) { add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); } // send_message. if (batch->send_message) { add_retriable_send_message_op(elem, retry_state, batch_data); } // send_trailing_metadata. if (batch->send_trailing_metadata) { add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); } // recv_initial_metadata. if (batch->recv_initial_metadata) { // recv_flags is only used on the server side. GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); } // recv_message. if (batch->recv_message) { add_retriable_recv_message_op(calld, retry_state, batch_data); } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || batch->send_trailing_metadata) { if (calld->num_pending_retriable_subchannel_send_batches == 0) { GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); } ++calld->num_pending_retriable_subchannel_send_batches; } } } // Constructs and starts whatever subchannel batches are needed on the // subchannel call. static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", chand, calld); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, &closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); } ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", chand, calld, closures.size(), calld->subchannel_call); } // Note: This will yield the call combiner. closures.RunClosures(calld->call_combiner); } // // LB pick // static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); const size_t parent_data_size = calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time calld->deadline, // deadline calld->arena, // arena calld->request->pick()->subchannel_call_context, // context calld->call_combiner, // call_combiner parent_data_size // parent_data_size }; grpc_error* new_error = calld->request->pick()->connected_subchannel->CreateCall( call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); } if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { if (parent_data_size > 0) { new (grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)) subchannel_call_retry_state( calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } GRPC_ERROR_UNREF(error); } // Invoked when a pick is completed, on both success or failure. static void pick_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) { // Failed to create subchannel. // If there was no error, this is an LB policy drop, in which case // we return an error; otherwise, we may retry. grpc_status_code status = GRPC_STATUS_OK; grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, nullptr); if (error == GRPC_ERROR_NONE || !calld->enable_retries || !maybe_retry(elem, nullptr /* batch_data */, status, nullptr /* server_pushback_md */)) { grpc_error* new_error = error == GRPC_ERROR_NONE ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failed to create subchannel: error=%s", chand, calld, grpc_error_string(new_error)); } pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } } else { /* Create call on subchannel. */ create_subchannel_call(elem, GRPC_ERROR_REF(error)); } } // If the channel is in TRANSIENT_FAILURE and the call is not // wait_for_ready=true, fails the call and returns true. static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; if (chand->request_router->GetConnectivityState() == GRPC_CHANNEL_TRANSIENT_FAILURE && (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { pending_batches_fail( elem, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "channel is in state TRANSIENT_FAILURE"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), true /* yield_call_combiner */); return true; } return false; } // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. static void apply_service_config_to_call_locked(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, calld); } if (chand->retry_throttle_data != nullptr) { calld->retry_throttle_data = chand->retry_throttle_data->Ref(); } if (chand->method_params_table != nullptr) { calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( *chand->method_params_table, calld->path); if (calld->method_params != nullptr) { // If the deadline from the service config is shorter than the one // from the client API, reset the deadline timer. if (chand->deadline_checking_enabled && calld->method_params->timeout() != 0) { const grpc_millis per_method_deadline = grpc_timespec_to_millis_round_up(calld->call_start_time) + calld->method_params->timeout(); if (per_method_deadline < calld->deadline) { calld->deadline = per_method_deadline; grpc_deadline_state_reset(elem, calld->deadline); } } // If the service config set wait_for_ready and the application // did not explicitly set it, use the value from the service config. uint32_t* send_initial_metadata_flags = &calld->pending_batches[0] .batch->payload->send_initial_metadata .send_initial_metadata_flags; if (GPR_UNLIKELY( calld->method_params->wait_for_ready() != ClientChannelMethodParams::WAIT_FOR_READY_UNSET && !(*send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { if (calld->method_params->wait_for_ready() == ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } } } // If no retry policy, disable retries. // TODO(roth): Remove this when adding support for transparent retries. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr) { calld->enable_retries = false; } } // Invoked once resolver results are available. static bool maybe_apply_service_config_to_call_locked(void* arg) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); // Check this after applying service config, since it may have // affected the call's wait_for_ready value. if (fail_call_if_in_transient_failure(elem)) return false; } return true; } static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(!calld->have_request); GPR_ASSERT(calld->subchannel_call == nullptr); // Normally, we want to do this check until after we've processed the // service config, so that we can honor the wait_for_ready setting in // the service config. However, if the channel is in TRANSIENT_FAILURE // and we don't have an LB policy at this point, that means that the // resolver has returned a failure, so we're not going to get a service // config right away. In that case, we fail the call now based on the // wait_for_ready value passed in from the application. if (chand->request_router->lb_policy() == nullptr && fail_call_if_in_transient_failure(elem)) { return; } // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the // list, as set by get_batch_index() above. // TODO(roth): What if the LB policy needs to add something to the // call's initial metadata, and then there's a retry? We don't want // the new metadata to be added twice. We might need to somehow // allocate the subchannel batch earlier so that we can give the // subchannel's copy of the metadata batch (which is copied for each // attempt) to the LB policy instead the one from the parent channel. grpc_metadata_batch* initial_metadata = calld->seen_send_initial_metadata ? &calld->send_initial_metadata : calld->pending_batches[0] .batch->payload->send_initial_metadata.send_initial_metadata; uint32_t* initial_metadata_flags = calld->seen_send_initial_metadata ? &calld->send_initial_metadata_flags : &calld->pending_batches[0] .batch->payload->send_initial_metadata .send_initial_metadata_flags; GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, grpc_schedule_on_exec_ctx); calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent, initial_metadata, initial_metadata_flags, maybe_apply_service_config_to_call_locked, elem, &calld->pick_closure); calld->have_request = true; chand->request_router->RouteCallLocked(calld->request.get()); } // // filter call vtable functions // static void cc_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); if (GPR_LIKELY(chand->deadline_checking_enabled)) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", chand, calld, grpc_error_string(calld->cancel_error)); } // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); return; } // Handle cancellation. if (GPR_UNLIKELY(batch->cancel_stream)) { // Stash a copy of cancel_error in our call data, so that we can use // it for subsequent operations. This ensures that if the call is // cancelled before any batches are passed down (e.g., if the deadline // is in the past when the call starts), we can return the right // error to the caller when the first batch does get passed down. GRPC_ERROR_UNREF(calld->cancel_error); calld->cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, calld, grpc_error_string(calld->cancel_error)); } // If we do not have a subchannel call (i.e., a pick has not yet // been started), fail all pending batches. Otherwise, send the // cancellation down to the subchannel call. if (calld->subchannel_call == nullptr) { pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), false /* yield_call_combiner */); // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); } else { // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, batch); } return; } // Add the batch to the pending list. pending_batches_add(elem, batch); // Check if we've already gotten a subchannel call. // Note that once we have completed the pick, we do not need to enter // the channel combiner, which is more efficient (especially for // streaming calls). if (calld->subchannel_call != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, calld, calld->subchannel_call); } pending_batches_resume(elem); return; } // We do not yet have a subchannel call. // For batches containing a send_initial_metadata op, enter the channel // combiner to start a pick. if (GPR_LIKELY(batch->send_initial_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", chand, calld); } GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, elem, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } else { // For all other batches, release the call combiner. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: saved batch, yielding call combiner", chand, calld); } GRPC_CALL_COMBINER_STOP(calld->call_combiner, "batch does not include send_initial_metadata"); } } /* Constructor for call_data */ static grpc_error* cc_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { channel_data* chand = static_cast(elem->channel_data); new (elem->call_data) call_data(elem, *chand, *args); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void cc_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = static_cast(elem->call_data); if (GPR_LIKELY(calld->subchannel_call != nullptr)) { grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, then_schedule_closure); then_schedule_closure = nullptr; } calld->~call_data(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = static_cast(elem->call_data); calld->pollent = pollent; } /************************************************************************* * EXPORTED SYMBOLS */ const grpc_channel_filter grpc_client_channel_filter = { cc_start_transport_stream_op_batch, cc_start_transport_op, sizeof(call_data), cc_init_call_elem, cc_set_pollset_or_pollset_set, cc_destroy_call_elem, sizeof(channel_data), cc_init_channel_elem, cc_destroy_channel_elem, cc_get_channel_info, "client-channel", }; void grpc_client_channel_set_channelz_node( grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) { channel_data* chand = static_cast(elem->channel_data); chand->request_router->set_channelz_node(node); } void grpc_client_channel_populate_child_refs( grpc_channel_element* elem, grpc_core::channelz::ChildRefsList* child_subchannels, grpc_core::channelz::ChildRefsList* child_channels) { channel_data* chand = static_cast(elem->channel_data); if (chand->request_router->lb_policy() != nullptr) { chand->request_router->lb_policy()->FillChildRefsForChannelz( child_subchannels, child_channels); } } static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast(arg); chand->request_router->ExitIdleLocked(); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); } grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast(elem->channel_data); grpc_connectivity_state out = chand->request_router->GetConnectivityState(); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; } typedef struct external_connectivity_watcher { channel_data* chand; grpc_polling_entity pollent; grpc_closure* on_complete; grpc_closure* watcher_timer_init; grpc_connectivity_state* state; grpc_closure my_closure; struct external_connectivity_watcher* next; } external_connectivity_watcher; static external_connectivity_watcher* lookup_external_connectivity_watcher( channel_data* chand, grpc_closure* on_complete) { gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr && w->on_complete != on_complete) { w = w->next; } gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return w; } static void external_connectivity_watcher_list_append( channel_data* chand, external_connectivity_watcher* w) { GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); GPR_ASSERT(!w->next); w->next = chand->external_connectivity_watcher_list_head; chand->external_connectivity_watcher_list_head = w; gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); } static void external_connectivity_watcher_list_remove( channel_data* chand, external_connectivity_watcher* to_remove) { GPR_ASSERT( lookup_external_connectivity_watcher(chand, to_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); if (to_remove == chand->external_connectivity_watcher_list_head) { chand->external_connectivity_watcher_list_head = to_remove->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { if (w->next == to_remove) { w->next = w->next->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } w = w->next; } GPR_UNREACHABLE_CODE(return ); } int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); int count = 0; gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { count++; w = w->next; } gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return count; } static void on_external_watch_complete_locked(void* arg, grpc_error* error) { external_connectivity_watcher* w = static_cast(arg); grpc_closure* follow_up = w->on_complete; grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(void* arg, grpc_error* error_ignored) { external_connectivity_watcher* w = static_cast(arg); external_connectivity_watcher* found = nullptr; if (w->state != nullptr) { external_connectivity_watcher_list_append(w->chand, w); // An assumption is being made that the closure is scheduled on the exec ctx // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); w->chand->request_router->NotifyOnConnectivityStateChange(w->state, &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); found->chand->request_router->NotifyOnConnectivityStateChange( nullptr, &found->my_closure); } grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); } } void grpc_client_channel_watch_connectivity_state( grpc_channel_element* elem, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* closure, grpc_closure* watcher_timer_init) { channel_data* chand = static_cast(elem->channel_data); external_connectivity_watcher* w = static_cast(gpr_zalloc(sizeof(*w))); w->chand = chand; w->pollent = pollent; w->on_complete = closure; w->state = state; w->watcher_timer_init = watcher_timer_init; grpc_polling_entity_add_to_pollset_set(&w->pollent, chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } grpc_subchannel_call* grpc_client_channel_get_subchannel_call( grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); return calld->subchannel_call; }