/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/filters/client_channel/client_channel.h" #include #include #include #include #include #include #include #include #include #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/method_params.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" using grpc_core::internal::ClientChannelMethodParams; using grpc_core::internal::ServerRetryThrottleData; /* Client channel implementation */ // By default, we buffer 256 KiB per RPC for retries. // TODO(roth): Do we have any data to suggest a better value? #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) // This value was picked arbitrarily. It can be changed if there is // any even moderately compelling reason to do so. #define RETRY_BACKOFF_JITTER 0.2 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ struct external_connectivity_watcher; typedef grpc_core::SliceHashTable< grpc_core::RefCountedPtr> MethodParamsTable; typedef struct client_channel_channel_data { grpc_core::OrphanablePtr resolver; bool started_resolving; bool deadline_checking_enabled; grpc_client_channel_factory* client_channel_factory; bool enable_retries; size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; /** currently active load balancer */ grpc_core::OrphanablePtr lb_policy; /** retry throttle data */ grpc_core::RefCountedPtr retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ grpc_closure_list waiting_for_resolver_result_closures; /** resolver callback */ grpc_closure on_resolver_result_changed; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ bool exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack* owning_stack; /** interested parties (owned) */ grpc_pollset_set* interested_parties; /* external_connectivity_watcher_list head is guarded by its own mutex, since * counts need to be grabbed immediately without polling on a cq */ gpr_mu external_connectivity_watcher_list_mu; struct external_connectivity_watcher* external_connectivity_watcher_list_head; /* the following properties are guarded by a mutex since APIs require them to be instantaneously available */ gpr_mu info_mu; grpc_core::UniquePtr info_lb_policy_name; /** service config in JSON form */ grpc_core::UniquePtr info_service_config_json; } channel_data; typedef struct { channel_data* chand; /** used as an identifier, don't dereference it because the LB policy may be * non-existing when the callback is run */ grpc_core::LoadBalancingPolicy* lb_policy; grpc_closure closure; } reresolution_request_args; /** We create one watcher for each new lb_policy that is returned from a resolver, to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher. */ typedef struct { channel_data* chand; grpc_closure on_changed; grpc_connectivity_state state; grpc_core::LoadBalancingPolicy* lb_policy; } lb_policy_connectivity_watcher; static void watch_lb_policy_locked(channel_data* chand, grpc_core::LoadBalancingPolicy* lb_policy, grpc_connectivity_state current_state); static void set_channel_connectivity_state_locked(channel_data* chand, grpc_connectivity_state state, grpc_error* error, const char* reason) { /* TODO: Improve failure handling: * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. * - Hand over pending picks from old policies during the switch that happens * when resolver provides an update. */ if (chand->lb_policy != nullptr) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* cancel picks with wait_for_ready=false */ chand->lb_policy->CancelMatchingPicksLocked( /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } else if (state == GRPC_CHANNEL_SHUTDOWN) { /* cancel all picks */ chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, GRPC_ERROR_REF(error)); } } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, grpc_connectivity_state_name(state)); } grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); } static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { lb_policy_connectivity_watcher* w = static_cast(arg); /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy.get()) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); } set_channel_connectivity_state_locked(w->chand, w->state, GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { watch_lb_policy_locked(w->chand, w->lb_policy, w->state); } } GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } static void watch_lb_policy_locked(channel_data* chand, grpc_core::LoadBalancingPolicy* lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher* w = static_cast(gpr_malloc(sizeof(*w))); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); } static void start_resolving_locked(channel_data* chand) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); } GPR_ASSERT(!chand->started_resolving); chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->resolver->NextLocked(&chand->resolver_result, &chand->on_resolver_result_changed); } typedef struct { char* server_name; grpc_core::RefCountedPtr retry_throttle_data; } service_config_parsing_state; static void parse_retry_throttle_params( const grpc_json* field, service_config_parsing_state* parsing_state) { if (strcmp(field->key, "retryThrottling") == 0) { if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. if (field->type != GRPC_JSON_OBJECT) return; int max_milli_tokens = 0; int milli_token_ratio = 0; for (grpc_json* sub_field = field->child; sub_field != nullptr; sub_field = sub_field->next) { if (sub_field->key == nullptr) return; if (strcmp(sub_field->key, "maxTokens") == 0) { if (max_milli_tokens != 0) return; // Duplicate. if (sub_field->type != GRPC_JSON_NUMBER) return; max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); if (max_milli_tokens == -1) return; max_milli_tokens *= 1000; } else if (strcmp(sub_field->key, "tokenRatio") == 0) { if (milli_token_ratio != 0) return; // Duplicate. if (sub_field->type != GRPC_JSON_NUMBER) return; // We support up to 3 decimal digits. size_t whole_len = strlen(sub_field->value); uint32_t multiplier = 1; uint32_t decimal_value = 0; const char* decimal_point = strchr(sub_field->value, '.'); if (decimal_point != nullptr) { whole_len = static_cast(decimal_point - sub_field->value); multiplier = 1000; size_t decimal_len = strlen(decimal_point + 1); if (decimal_len > 3) decimal_len = 3; if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, &decimal_value)) { return; } uint32_t decimal_multiplier = 1; for (size_t i = 0; i < (3 - decimal_len); ++i) { decimal_multiplier *= 10; } decimal_value *= decimal_multiplier; } uint32_t whole_value; if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, &whole_value)) { return; } milli_token_ratio = static_cast((whole_value * multiplier) + decimal_value); if (milli_token_ratio <= 0) return; } } parsing_state->retry_throttle_data = grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( parsing_state->server_name, max_milli_tokens, milli_token_ratio); } } // Invoked from the resolver NextLocked() callback when the resolver // is shutting down. static void on_resolver_shutdown_locked(channel_data* chand, grpc_error* error) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: shutting down", chand); } if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, chand->lb_policy.get()); } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); chand->lb_policy.reset(); } if (chand->resolver != nullptr) { // This should never happen; it can only be triggered by a resolver // implementation spotaneously deciding to report shutdown without // being orphaned. This code is included just to be defensive. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", chand, chand->resolver.get()); } chand->resolver.reset(); set_channel_connectivity_state_locked( chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Resolver spontaneous shutdown", &error, 1), "resolver_spontaneous_shutdown"); } grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Channel disconnected", &error, 1)); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; GRPC_ERROR_UNREF(error); } // Returns the LB policy name from the resolver result. static grpc_core::UniquePtr get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { // Find LB policy name in channel args. const grpc_arg* channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver actually specified. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { grpc_lb_addresses* addresses = static_cast(channel_arg->value.pointer.p); if (grpc_lb_addresses_contains_balancer_address(*addresses)) { if (lb_policy_name != nullptr && gpr_stricmp(lb_policy_name, "grpclb") != 0) { gpr_log(GPR_INFO, "resolver requested LB policy %s but provided at least one " "balancer address -- forcing use of grpclb LB policy", lb_policy_name); } lb_policy_name = "grpclb"; } } // Use pick_first if nothing was specified and we didn't select grpclb // above. if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; return grpc_core::UniquePtr(gpr_strdup(lb_policy_name)); } static void request_reresolution_locked(void* arg, grpc_error* error) { reresolution_request_args* args = static_cast(arg); channel_data* chand = args->chand; // If this invocation is for a stale LB policy, treat it as an LB shutdown // signal. if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); gpr_free(args); return; } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); } chand->resolver->RequestReresolutionLocked(); // Give back the closure to the LB policy. chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } // Creates a new LB policy, replacing any previous one. // If the new policy is created successfully, sets *connectivity_state and // *connectivity_error to its initial connectivity state; otherwise, // leaves them unchanged. static void create_new_lb_policy_locked( channel_data* chand, char* lb_policy_name, grpc_connectivity_state* connectivity_state, grpc_error** connectivity_error) { grpc_core::LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = chand->combiner; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.args = chand->resolver_result; grpc_core::OrphanablePtr new_lb_policy = grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, lb_policy_args); if (GPR_UNLIKELY(new_lb_policy == nullptr)) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, lb_policy_name, new_lb_policy.get()); } // Swap out the LB policy and update the fds in // chand->interested_parties. if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, chand->lb_policy.get()); } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); } chand->lb_policy = std::move(new_lb_policy); grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); // Set up re-resolution callback. reresolution_request_args* args = static_cast(gpr_zalloc(sizeof(*args))); args->chand = chand; args->lb_policy = chand->lb_policy.get(); GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, grpc_combiner_scheduler(chand->combiner)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); chand->lb_policy->SetReresolutionClosureLocked(&args->closure); // Get the new LB policy's initial connectivity state and start a // connectivity watch. GRPC_ERROR_UNREF(*connectivity_error); *connectivity_state = chand->lb_policy->CheckConnectivityLocked(connectivity_error); if (chand->exit_idle_when_lb_policy_arrives) { chand->lb_policy->ExitIdleLocked(); chand->exit_idle_when_lb_policy_arrives = false; } watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); } } // Returns the service config (as a JSON string) from the resolver result. // Also updates state in chand. static grpc_core::UniquePtr get_service_config_from_resolver_result_locked(channel_data* chand) { const grpc_arg* channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); const char* service_config_json = grpc_channel_arg_get_string(channel_arg); if (service_config_json != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", chand, service_config_json); } grpc_core::UniquePtr service_config = grpc_core::ServiceConfig::Create(service_config_json); if (service_config != nullptr) { if (chand->enable_retries) { channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(channel_arg); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; service_config->ParseGlobalParams(parse_retry_throttle_params, &parsing_state); grpc_uri_destroy(uri); chand->retry_throttle_data = std::move(parsing_state.retry_throttle_data); } chand->method_params_table = service_config->CreateMethodConfigTable( ClientChannelMethodParams::CreateFromJson); } } return grpc_core::UniquePtr(gpr_strdup(service_config_json)); } // Callback invoked when a resolver result is available. static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = static_cast(arg); if (grpc_client_channel_trace.enabled()) { const char* disposition = chand->resolver_result != nullptr ? "" : (error == GRPC_ERROR_NONE ? " (transient error)" : " (resolver shutdown)"); gpr_log(GPR_INFO, "chand=%p: got resolver result: resolver_result=%p error=%s%s", chand, chand->resolver_result, grpc_error_string(error), disposition); } // Handle shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); return; } // Data used to set the channel's connectivity state. bool set_connectivity_state = true; grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* connectivity_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); // chand->resolver_result will be null in the case of a transient // resolution error. In that case, we don't have any new result to // process, which means that we keep using the previous result (if any). if (chand->resolver_result == nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); } } else { grpc_core::UniquePtr lb_policy_name = get_lb_policy_name_from_resolver_result_locked(chand); // Check to see if we're already using the right LB policy. // Note: It's safe to use chand->info_lb_policy_name here without // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked // once at any given time. bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr || gpr_stricmp(chand->info_lb_policy_name.get(), lb_policy_name.get()) != 0; if (chand->lb_policy != nullptr && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", chand, lb_policy_name.get(), chand->lb_policy.get()); } chand->lb_policy->UpdateLocked(*chand->resolver_result); // No need to set the channel's connectivity state; the existing // watch on the LB policy will take care of that. set_connectivity_state = false; } else { // Instantiate new LB policy. create_new_lb_policy_locked(chand, lb_policy_name.get(), &connectivity_state, &connectivity_error); } // Find service config. grpc_core::UniquePtr service_config_json = get_service_config_from_resolver_result_locked(chand); // Swap out the data used by cc_get_channel_info(). gpr_mu_lock(&chand->info_mu); chand->info_lb_policy_name = std::move(lb_policy_name); chand->info_service_config_json = std::move(service_config_json); gpr_mu_unlock(&chand->info_mu); // Clean up. grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = nullptr; } // Set the channel's connectivity state if needed. if (set_connectivity_state) { set_channel_connectivity_state_locked( chand, connectivity_state, connectivity_error, "resolver_result"); } else { GRPC_ERROR_UNREF(connectivity_error); } // Invoke closures that were waiting for results and renew the watch. GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); chand->resolver->NextLocked(&chand->resolver_result, &chand->on_resolver_result_changed); } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { grpc_transport_op* op = static_cast(arg); grpc_channel_element* elem = static_cast(op->handler_private.extra_arg); channel_data* chand = static_cast(elem->channel_data); if (op->on_connectivity_state_change != nullptr) { grpc_connectivity_state_notify_on_state_change( &chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (chand->lb_policy == nullptr) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); } else { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; pick_state.initial_metadata = nullptr; pick_state.initial_metadata_flags = 0; pick_state.on_complete = nullptr; memset(&pick_state.subchannel_call_context, 0, sizeof(pick_state.subchannel_call_context)); pick_state.user_data = nullptr; // Pick must return synchronously, because pick_state.on_complete is null. GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); } else { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "LB policy dropped call on ping"); } GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); } op->bind_pollset = nullptr; } op->send_ping.on_initiate = nullptr; op->send_ping.on_ack = nullptr; } if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (chand->resolver != nullptr) { set_channel_connectivity_state_locked( chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); chand->resolver.reset(); if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, GRPC_ERROR_REF(op->disconnect_with_error)); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); chand->lb_policy.reset(); } } GRPC_ERROR_UNREF(op->disconnect_with_error); } if (op->reset_connect_backoff) { if (chand->resolver != nullptr) { chand->resolver->ResetBackoffLocked(); chand->resolver->RequestReresolutionLocked(); } if (chand->lb_policy != nullptr) { chand->lb_policy->ResetBackoffLocked(); } } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } static void cc_start_transport_op(grpc_channel_element* elem, grpc_transport_op* op) { channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != nullptr) { grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); } op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_channel_element* elem, const grpc_channel_info* info) { channel_data* chand = static_cast(elem->channel_data); gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != nullptr) { *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); } if (info->service_config_json != nullptr) { *info->service_config_json = gpr_strdup(chand->info_service_config_json.get()); } gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); chand->external_connectivity_watcher_list_head = nullptr; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. const grpc_arg* arg = grpc_channel_args_find( args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); // Record enable_retries. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); chand->enable_retries = grpc_channel_arg_get_bool(arg, true); // Record client channel factory. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); if (arg == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Missing client channel factory in args for client channel filter"); } if (arg->type != GRPC_ARG_POINTER) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } grpc_client_channel_factory_ref( static_cast(arg->value.pointer.p)); chand->client_channel_factory = static_cast(arg->value.pointer.p); // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); if (arg == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Missing server uri in args for client channel filter"); } if (arg->type != GRPC_ARG_STRING) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "server uri arg must be a string"); } char* proxy_name = nullptr; grpc_channel_args* new_args = nullptr; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); // Instantiate resolver. chand->resolver = grpc_core::ResolverRegistry::CreateResolver( proxy_name != nullptr ? proxy_name : arg->value.string, new_args != nullptr ? new_args : args->channel_args, chand->interested_parties, chand->combiner); if (proxy_name != nullptr) gpr_free(proxy_name); if (new_args != nullptr) grpc_channel_args_destroy(new_args); if (chand->resolver == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); } chand->deadline_checking_enabled = grpc_deadline_checking_enabled(args->channel_args); return GRPC_ERROR_NONE; } /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); if (chand->resolver != nullptr) { // The only way we can get here is if we never started resolving, // because we take a ref to the channel stack when we start // resolving and do not release it until the resolver callback is // invoked after the resolver shuts down. chand->resolver.reset(); } if (chand->client_channel_factory != nullptr) { grpc_client_channel_factory_unref(chand->client_channel_factory); } if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); chand->lb_policy.reset(); } // TODO(roth): Once we convert the filter API to C++, there will no // longer be any need to explicitly reset these smart pointer data members. chand->info_lb_policy_name.reset(); chand->info_service_config_json.reset(); chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* * PER-CALL FUNCTIONS */ // Max number of batches that can be pending on a call at any given // time. This includes one batch for each of the following ops: // recv_initial_metadata // send_initial_metadata // recv_message // send_message // recv_trailing_metadata // send_trailing_metadata #define MAX_PENDING_BATCHES 6 // Retry support: // // In order to support retries, we act as a proxy for stream op batches. // When we get a batch from the surface, we add it to our list of pending // batches, and we then use those batches to construct separate "child" // batches to be started on the subchannel call. When the child batches // return, we then decide which pending batches have been completed and // schedule their callbacks accordingly. If a subchannel call fails and // we want to retry it, we do a new pick and start again, constructing // new "child" batches for the new subchannel call. // // Note that retries are committed when receiving data from the server // (except for Trailers-Only responses). However, there may be many // send ops started before receiving any data, so we may have already // completed some number of send ops (and returned the completions up to // the surface) by the time we realize that we need to retry. To deal // with this, we cache data for send ops, so that we can replay them on a // different subchannel call even after we have completed the original // batches. // // There are two sets of data to maintain: // - In call_data (in the parent channel), we maintain a list of pending // ops and cached data for send ops. // - In the subchannel call, we maintain state to indicate what ops have // already been sent down to that call. // // When constructing the "child" batches, we compare those two sets of // data to see which batches need to be sent to the subchannel call. // TODO(roth): In subsequent PRs: // - add support for transparent retries (including initial metadata) // - figure out how to record stats in census for retries // (census filter is on top of this one) // - add census stats for retries // State used for starting a retryable batch on a subchannel call. // This provides its own grpc_transport_stream_op_batch and other data // structures needed to populate the ops in the batch. // We allocate one struct on the arena for each attempt at starting a // batch on a given subchannel call. typedef struct { gpr_refcount refs; grpc_call_element* elem; grpc_subchannel_call* subchannel_call; // Holds a ref. // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; // For intercepting on_complete. grpc_closure on_complete; } subchannel_batch_data; // Retry state associated with a subchannel call. // Stored in the parent_data of the subchannel call object. typedef struct { // subchannel_batch_data.batch.payload points to this. grpc_transport_stream_op_batch_payload batch_payload; // For send_initial_metadata. // Note that we need to make a copy of the initial metadata for each // subchannel call instead of just referring to the copy in call_data, // because filters in the subchannel stack will probably add entries, // so we need to start in a pristine state for each attempt of the call. grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. grpc_core::ManualConstructor send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; // For intercepting recv_initial_metadata. grpc_metadata_batch recv_initial_metadata; grpc_closure recv_initial_metadata_ready; bool trailing_metadata_available; // For intercepting recv_message. grpc_closure recv_message_ready; grpc_core::OrphanablePtr recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; grpc_closure recv_trailing_metadata_ready; // These fields indicate which ops have been started and completed on // this subchannel call. size_t started_send_message_count; size_t completed_send_message_count; size_t started_recv_message_count; size_t completed_recv_message_count; bool started_send_initial_metadata : 1; bool completed_send_initial_metadata : 1; bool started_send_trailing_metadata : 1; bool completed_send_trailing_metadata : 1; bool started_recv_initial_metadata : 1; bool completed_recv_initial_metadata : 1; bool started_recv_trailing_metadata : 1; bool completed_recv_trailing_metadata : 1; // State for callback processing. bool retry_dispatched : 1; subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; grpc_error* recv_initial_metadata_error; subchannel_batch_data* recv_message_ready_deferred_batch; grpc_error* recv_message_error; subchannel_batch_data* recv_trailing_metadata_internal_batch; } subchannel_call_retry_state; // Pending batches stored in call data. typedef struct { // The pending batch. If nullptr, this slot is empty. grpc_transport_stream_op_batch* batch; // Indicates whether payload for send ops has been cached in call data. bool send_ops_cached; } pending_batch; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { // State for handling deadlines. // The code in deadline_filter.c requires this to be the first field. // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state // and this struct both independently store pointers to the call stack // and call combiner. If/when we have time, find a way to avoid this // without breaking the grpc_deadline_state abstraction. grpc_deadline_state deadline_state; grpc_slice path; // Request path. gpr_timespec call_start_time; grpc_millis deadline; gpr_arena* arena; grpc_call_stack* owning_call; grpc_call_combiner* call_combiner; grpc_core::RefCountedPtr retry_throttle_data; grpc_core::RefCountedPtr method_params; grpc_subchannel_call* subchannel_call; // Set when we get a cancel_stream op. grpc_error* cancel_error; grpc_core::LoadBalancingPolicy::PickState pick; grpc_closure pick_closure; grpc_closure pick_cancel_closure; grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure* original_recv_trailing_metadata; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have // passed the batch down to the subchannel call and are not // intercepting any of its callbacks). pending_batch pending_batches[MAX_PENDING_BATCHES]; bool pending_send_initial_metadata : 1; bool pending_send_message : 1; bool pending_send_trailing_metadata : 1; // Retry state. bool enable_retries : 1; bool retry_committed : 1; bool last_attempt_got_server_pushback : 1; int num_attempts_completed; size_t bytes_buffered_for_retry; grpc_core::ManualConstructor retry_backoff; grpc_timer retry_timer; // The number of pending retriable subchannel batches containing send ops. // We hold a ref to the call stack while this is non-zero, since replay // batches may not complete until after all callbacks have been returned // to the surface, and we need to make sure that the call is not destroyed // until all of these batches have completed. // Note that we actually only need to track replay batches, but it's // easier to track all batches with send ops. int num_pending_retriable_subchannel_send_batches; // Cached data for retrying send ops. // send_initial_metadata bool seen_send_initial_metadata; grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; uint32_t send_initial_metadata_flags; gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream // with a CachingByteStream that caches the slices to a local buffer for // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. grpc_core::ManualConstructor< grpc_core::InlinedVector> send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; } call_data; // Forward declarations. static void retry_commit(grpc_call_element* elem, subchannel_call_retry_state* retry_state); static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); template static pending_batch* pending_batch_find(grpc_call_element* elem, const char* log_message, Predicate predicate); static void get_call_status(grpc_call_element* elem, grpc_metadata_batch* md_batch, grpc_error* error, grpc_status_code* status, grpc_mdelem** server_pushback_md); // // send op data caching // // Caches data for send ops so that it can be retried later, if not // already cached. static void maybe_cache_send_ops_for_batch(call_data* calld, pending_batch* pending) { if (pending->send_ops_cached) return; pending->send_ops_cached = true; grpc_transport_stream_op_batch* batch = pending->batch; // Save a copy of metadata for send_initial_metadata ops. if (batch->send_initial_metadata) { calld->seen_send_initial_metadata = true; GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); grpc_metadata_batch* send_initial_metadata = batch->payload->send_initial_metadata.send_initial_metadata; calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); grpc_metadata_batch_copy(send_initial_metadata, &calld->send_initial_metadata, calld->send_initial_metadata_storage); calld->send_initial_metadata_flags = batch->payload->send_initial_metadata.send_initial_metadata_flags; calld->peer_string = batch->payload->send_initial_metadata.peer_string; } // Set up cache for send_message ops. if (batch->send_message) { grpc_core::ByteStreamCache* cache = static_cast( gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); new (cache) grpc_core::ByteStreamCache( std::move(batch->payload->send_message.send_message)); calld->send_messages->push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { calld->seen_send_trailing_metadata = true; GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); grpc_metadata_batch* send_trailing_metadata = batch->payload->send_trailing_metadata.send_trailing_metadata; calld->send_trailing_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); grpc_metadata_batch_copy(send_trailing_metadata, &calld->send_trailing_metadata, calld->send_trailing_metadata_storage); } } // Frees cached send_initial_metadata. static void free_cached_send_initial_metadata(channel_data* chand, call_data* calld) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, calld); } grpc_metadata_batch_destroy(&calld->send_initial_metadata); } // Frees cached send_message at index idx. static void free_cached_send_message(channel_data* chand, call_data* calld, size_t idx) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", chand, calld, idx); } (*calld->send_messages)[idx]->Destroy(); } // Frees cached send_trailing_metadata. static void free_cached_send_trailing_metadata(channel_data* chand, call_data* calld) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: destroying calld->send_trailing_metadata", chand, calld); } grpc_metadata_batch_destroy(&calld->send_trailing_metadata); } // Frees cached send ops that have already been completed after // committing the call. static void free_cached_send_op_data_after_commit( grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (retry_state->completed_send_initial_metadata) { free_cached_send_initial_metadata(chand, calld); } for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { free_cached_send_message(chand, calld, i); } if (retry_state->completed_send_trailing_metadata) { free_cached_send_trailing_metadata(chand, calld); } } // Frees cached send ops that were completed by the completed batch in // batch_data. Used when batches are completed after the call is committed. static void free_cached_send_op_data_for_completed_batch( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (batch_data->batch.send_initial_metadata) { free_cached_send_initial_metadata(chand, calld); } if (batch_data->batch.send_message) { free_cached_send_message(chand, calld, retry_state->completed_send_message_count - 1); } if (batch_data->batch.send_trailing_metadata) { free_cached_send_trailing_metadata(chand, calld); } } // // pending_batches management // // Returns the index into calld->pending_batches to be used for batch. static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { // Note: It is important the send_initial_metadata be the first entry // here, since the code in pick_subchannel_locked() assumes it will be. if (batch->send_initial_metadata) return 0; if (batch->send_message) return 1; if (batch->send_trailing_metadata) return 2; if (batch->recv_initial_metadata) return 3; if (batch->recv_message) return 4; if (batch->recv_trailing_metadata) return 5; GPR_UNREACHABLE_CODE(return (size_t)-1); } // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_add(grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); const size_t idx = get_batch_index(batch); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, calld, idx); } pending_batch* pending = &calld->pending_batches[idx]; GPR_ASSERT(pending->batch == nullptr); pending->batch = batch; pending->send_ops_cached = false; if (calld->enable_retries) { // Update state in calld about pending batches. // Also check if the batch takes us over the retry buffer limit. // Note: We don't check the size of trailing metadata here, because // gRPC clients do not send trailing metadata. if (batch->send_initial_metadata) { calld->pending_send_initial_metadata = true; calld->bytes_buffered_for_retry += grpc_metadata_batch_size( batch->payload->send_initial_metadata.send_initial_metadata); } if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; } if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded retry buffer size, committing", chand, calld); } subchannel_call_retry_state* retry_state = calld->subchannel_call == nullptr ? nullptr : static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); retry_commit(elem, retry_state); // If we are not going to retry and have not yet started, pretend // retries are disabled so that we don't bother with retry overhead. if (calld->num_attempts_completed == 0) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: disabling retries before first attempt", chand, calld); } calld->enable_retries = false; } } } } static void pending_batch_clear(call_data* calld, pending_batch* pending) { if (calld->enable_retries) { if (pending->batch->send_initial_metadata) { calld->pending_send_initial_metadata = false; } if (pending->batch->send_message) { calld->pending_send_message = false; } if (pending->batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = false; } } pending->batch = nullptr; } // This is called via the call combiner, so access to calld is synchronized. static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); call_data* calld = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(error), calld->call_combiner); } // This is called via the call combiner, so access to calld is synchronized. // If yield_call_combiner is true, assumes responsibility for yielding // the call combiner. static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, bool yield_call_combiner) { GPR_ASSERT(error != GRPC_ERROR_NONE); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } gpr_log(GPR_INFO, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { batch->handler_private.extra_arg = calld; GRPC_CLOSURE_INIT(&batch->handler_private.closure, fail_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), "pending_batches_fail"); pending_batch_clear(calld, pending); } } if (yield_call_combiner) { closures.RunClosures(calld->call_combiner); } else { closures.RunClosuresWithoutYielding(calld->call_combiner); } GRPC_ERROR_UNREF(error); } // This is called via the call combiner, so access to calld is synchronized. static void resume_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); grpc_subchannel_call* subchannel_call = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_subchannel_call_process_op(subchannel_call, batch); } static void recv_trailing_metadata_ready_channelz(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " "error=%s", chand, calld, grpc_error_string(error)); } // find the right pending batch. pending_batch* pending = pending_batch_find( elem, "invoking recv_trailing_metadata_channelz for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_trailing_metadata && batch->payload->recv_trailing_metadata .recv_trailing_metadata_ready != nullptr; }); grpc_status_code status = GRPC_STATUS_OK; grpc_metadata_batch* md_batch = pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata; get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); grpc_core::channelz::SubchannelNode* channelz_subchannel = calld->pick.connected_subchannel->channelz_subchannel(); GPR_ASSERT(channelz_subchannel != nullptr); if (status == GRPC_STATUS_OK) { channelz_subchannel->RecordCallSucceeded(); } else { channelz_subchannel->RecordCallFailed(); } pending->batch = nullptr; GRPC_CLOSURE_SCHED(calld->original_recv_trailing_metadata, error); } // If channelz is enabled, intercept recv_trailing so that we may check the // status and associate it to a subchannel. // Returns true if callback was intercepted, false otherwise. static bool maybe_intercept_recv_trailing_for_channelz( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = static_cast(elem->call_data); // only add interceptor is channelz is enabled. if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) { GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, recv_trailing_metadata_ready_channelz, elem, grpc_schedule_on_exec_ctx); calld->original_recv_trailing_metadata = batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = &calld->recv_trailing_metadata_ready_channelz; return true; } else { return false; } } // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (calld->enable_retries) { start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); return; } // Retries not enabled; send down batches as-is. if (grpc_client_channel_trace.enabled()) { size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { if (calld->pending_batches[i].batch != nullptr) ++num_batches; } gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { bool intercepted = maybe_intercept_recv_trailing_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "pending_batches_resume"); // Only clear if we haven't intercepted anything. if (!intercepted) { pending_batch_clear(calld, pending); } } } // Note: This will release the call combiner. closures.RunClosures(calld->call_combiner); } static void maybe_clear_pending_batch(grpc_call_element* elem, pending_batch* pending) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); grpc_transport_stream_op_batch* batch = pending->batch; // We clear the pending batch if all of its callbacks have been // scheduled and reset to nullptr. if (batch->on_complete == nullptr && (!batch->recv_initial_metadata || batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || batch->payload->recv_message.recv_message_ready == nullptr) && (!batch->recv_trailing_metadata || batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); } pending_batch_clear(calld, pending); } } // Returns a pointer to the first pending batch for which predicate(batch) // returns true, or null if not found. template static pending_batch* pending_batch_find(grpc_call_element* elem, const char* log_message, Predicate predicate) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr && predicate(batch)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, calld, log_message, i); } return pending; } } return nullptr; } // // retry code // // Commits the call so that no further retry attempts will be performed. static void retry_commit(grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (calld->retry_committed) return; calld->retry_committed = true; if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); } if (retry_state != nullptr) { free_cached_send_op_data_after_commit(elem, retry_state); } } // Starts a retry after appropriate back-off. static void do_retry(grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_millis server_pushback_ms) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); GPR_ASSERT(calld->method_params != nullptr); const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); GPR_ASSERT(retry_policy != nullptr); // Reset subchannel call and connected subchannel. if (calld->subchannel_call != nullptr) { GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_call_retry"); calld->subchannel_call = nullptr; } if (calld->pick.connected_subchannel != nullptr) { calld->pick.connected_subchannel.reset(); } // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; calld->last_attempt_got_server_pushback = true; } else { if (calld->num_attempts_completed == 1 || calld->last_attempt_got_server_pushback) { calld->retry_backoff.Init( grpc_core::BackOff::Options() .set_initial_backoff(retry_policy->initial_backoff) .set_multiplier(retry_policy->backoff_multiplier) .set_jitter(RETRY_BACKOFF_JITTER) .set_max_backoff(retry_policy->max_backoff)); calld->last_attempt_got_server_pushback = false; } next_attempt_time = calld->retry_backoff->NextAttemptTime(); } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand, calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); } // Schedule retry after computed delay. GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, grpc_combiner_scheduler(chand->combiner)); grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); // Update bookkeeping. if (retry_state != nullptr) retry_state->retry_dispatched = true; } // Returns true if the call is being retried. static bool maybe_retry(grpc_call_element* elem, subchannel_batch_data* batch_data, grpc_status_code status, grpc_mdelem* server_pushback_md) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); // Get retry policy. if (calld->method_params == nullptr) return false; const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); if (retry_policy == nullptr) return false; // If we've already dispatched a retry from this call, return true. // This catches the case where the batch has multiple callbacks // (i.e., it includes either recv_message or recv_initial_metadata). subchannel_call_retry_state* retry_state = nullptr; if (batch_data != nullptr) { retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); if (retry_state->retry_dispatched) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, calld); } return true; } } // Check status. if (GPR_LIKELY(status == GRPC_STATUS_OK)) { if (calld->retry_throttle_data != nullptr) { calld->retry_throttle_data->RecordSuccess(); } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); } return false; } // Status is not OK. Check whether the status is retryable. if (!retry_policy->retryable_status_codes.Contains(status)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: status %s not configured as retryable", chand, calld, grpc_status_code_to_string(status)); } return false; } // Record the failure and check whether retries are throttled. // Note that it's important for this check to come after the status // code check above, since we should only record failures whose statuses // match the configured retryable status codes, so that we don't count // things like failures due to malformed requests (INVALID_ARGUMENT). // Conversely, it's important for this to come before the remaining // checks, so that we don't fail to record failures due to other factors. if (calld->retry_throttle_data != nullptr && !calld->retry_throttle_data->RecordFailure()) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); } return false; } // Check whether the call is committed. if (calld->retry_committed) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, calld); } return false; } // Check whether we have retries remaining. ++calld->num_attempts_completed; if (calld->num_attempts_completed >= retry_policy->max_attempts) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, calld, retry_policy->max_attempts); } return false; } // If the call was cancelled from the surface, don't retry. if (calld->cancel_error != GRPC_ERROR_NONE) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call cancelled from surface, not retrying", chand, calld); } return false; } // Check server push-back. grpc_millis server_pushback_ms = -1; if (server_pushback_md != nullptr) { // If the value is "-1" or any other unparseable string, we do not retry. uint32_t ms; if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: not retrying due to server push-back", chand, calld); } return false; } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", chand, calld, ms); } server_pushback_ms = (grpc_millis)ms; } } do_retry(elem, retry_state, server_pushback_ms); return true; } // // subchannel_batch_data // // Creates a subchannel_batch_data object on the call's arena with the // specified refcount. If set_on_complete is true, the batch's // on_complete callback will be set to point to on_complete(); // otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, int refcount, bool set_on_complete) { call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); subchannel_batch_data* batch_data = static_cast( gpr_arena_alloc(calld->arena, sizeof(*batch_data))); batch_data->elem = elem; batch_data->subchannel_call = GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); batch_data->batch.payload = &retry_state->batch_payload; gpr_ref_init(&batch_data->refs, refcount); if (set_on_complete) { GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.on_complete = &batch_data->on_complete; } GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); return batch_data; } static void batch_data_unref(subchannel_batch_data* batch_data) { if (gpr_unref(&batch_data->refs)) { subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); if (batch_data->batch.send_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } if (batch_data->batch.send_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); } if (batch_data->batch.recv_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); } if (batch_data->batch.recv_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); call_data* calld = static_cast(batch_data->elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } } // // recv_initial_metadata callback handling // // Invokes recv_initial_metadata_ready for a subchannel batch. static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); // Find pending batch. pending_batch* pending = pending_batch_find( batch_data->elem, "invoking recv_initial_metadata_ready for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_initial_metadata && batch->payload->recv_initial_metadata .recv_initial_metadata_ready != nullptr; }); GPR_ASSERT(pending != nullptr); // Return metadata. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. grpc_closure* recv_initial_metadata_ready = pending->batch->payload->recv_initial_metadata .recv_initial_metadata_ready; pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = nullptr; maybe_clear_pending_batch(batch_data->elem, pending); batch_data_unref(batch_data); // Invoke callback. GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } // Intercepts recv_initial_metadata_ready callback for retries. // Commits the call and returns the initial metadata up the stack. static void recv_initial_metadata_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); retry_state->completed_recv_initial_metadata = true; // If a retry was already dispatched, then we're not going to use the // result of this recv_initial_metadata op, so do nothing. if (retry_state->retry_dispatched) { GRPC_CALL_COMBINER_STOP( calld->call_combiner, "recv_initial_metadata_ready after retry dispatched"); return; } // If we got an error or a Trailers-Only response and have not yet gotten // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY((retry_state->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_initial_metadata_ready " "(Trailers-Only)", chand, calld); } retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it // ourselves to get status. start_internal_recv_trailing_metadata(elem); } else { GRPC_CALL_COMBINER_STOP( calld->call_combiner, "recv_initial_metadata_ready trailers-only or error"); } return; } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); } // // recv_message callback handling // // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); // Find pending op. pending_batch* pending = pending_batch_find( batch_data->elem, "invoking recv_message_ready for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_message && batch->payload->recv_message.recv_message_ready != nullptr; }); GPR_ASSERT(pending != nullptr); // Return payload. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); *pending->batch->payload->recv_message.recv_message = std::move(retry_state->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. grpc_closure* recv_message_ready = pending->batch->payload->recv_message.recv_message_ready; pending->batch->payload->recv_message.recv_message_ready = nullptr; maybe_clear_pending_batch(batch_data->elem, pending); batch_data_unref(batch_data); // Invoke callback. GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); } // Intercepts recv_message_ready callback for retries. // Commits the call and returns the message up the stack. static void recv_message_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); ++retry_state->completed_recv_message_count; // If a retry was already dispatched, then we're not going to use the // result of this recv_message op, so do nothing. if (retry_state->retry_dispatched) { GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready after retry dispatched"); return; } // If we got an error or the payload was nullptr and we have not yet gotten // the recv_trailing_metadata_ready callback, then defer propagating this // callback back to the surface. We can evaluate whether to retry when // recv_trailing_metadata comes back. if (GPR_UNLIKELY( (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: deferring recv_message_ready (nullptr " "message and recv_trailing_metadata pending)", chand, calld); } retry_state->recv_message_ready_deferred_batch = batch_data; retry_state->recv_message_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it // ourselves to get status. start_internal_recv_trailing_metadata(elem); } else { GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); } return; } // Received a valid message, so commit the call. retry_commit(elem, retry_state); // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); } // // recv_trailing_metadata handling // // Sets *status and *server_pushback_md based on md_batch and error. // Only sets *server_pushback_md if server_pushback_md != nullptr. static void get_call_status(grpc_call_element* elem, grpc_metadata_batch* md_batch, grpc_error* error, grpc_status_code* status, grpc_mdelem** server_pushback_md) { call_data* calld = static_cast(elem->call_data); if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, nullptr); } else { GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); *status = grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); if (server_pushback_md != nullptr && md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } } GRPC_ERROR_UNREF(error); } // Adds recv_trailing_metadata_ready closure to closures. static void add_closure_for_recv_trailing_metadata_ready( grpc_call_element* elem, subchannel_batch_data* batch_data, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { // Find pending batch. pending_batch* pending = pending_batch_find( elem, "invoking recv_trailing_metadata for", [](grpc_transport_stream_op_batch* batch) { return batch->recv_trailing_metadata && batch->payload->recv_trailing_metadata .recv_trailing_metadata_ready != nullptr; }); // If we generated the recv_trailing_metadata op internally via // start_internal_recv_trailing_metadata(), then there will be no // pending batch. if (pending == nullptr) { GRPC_ERROR_UNREF(error); return; } // Return metadata. subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); // Add closure. closures->Add(pending->batch->payload->recv_trailing_metadata .recv_trailing_metadata_ready, error, "recv_trailing_metadata_ready for pending batch"); // Update bookkeeping. pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = nullptr; maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and // recv_message callbacks to closures. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, invoke_recv_initial_metadata_callback, retry_state->recv_initial_metadata_ready_deferred_batch, grpc_schedule_on_exec_ctx); closures->Add(&retry_state->recv_initial_metadata_ready, retry_state->recv_initial_metadata_error, "resuming recv_initial_metadata_ready"); retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, invoke_recv_message_callback, retry_state->recv_message_ready_deferred_batch, grpc_schedule_on_exec_ctx); closures->Add(&retry_state->recv_message_ready, retry_state->recv_message_error, "resuming recv_message_ready"); retry_state->recv_message_ready_deferred_batch = nullptr; } } } // Returns true if any op in the batch was not yet started. // Only looks at send ops, since recv ops are always started immediately. static bool pending_batch_is_unstarted( pending_batch* pending, call_data* calld, subchannel_call_retry_state* retry_state) { if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { return false; } if (pending->batch->send_initial_metadata && !retry_state->started_send_initial_metadata) { return true; } if (pending->batch->send_message && retry_state->started_send_message_count < calld->send_messages->size()) { return true; } if (pending->batch->send_trailing_metadata && !retry_state->started_send_trailing_metadata) { return true; } return false; } // For any pending batch containing an op that has not yet been started, // adds the pending batch's completion closures to closures. static void add_closures_to_fail_unstarted_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; if (pending_batch_is_unstarted(pending, calld, retry_state)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failing unstarted pending batch at index " "%" PRIuPTR, chand, calld, i); } closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), "failing on_complete for pending batch"); pending->batch->on_complete = nullptr; maybe_clear_pending_batch(elem, pending); } } GRPC_ERROR_UNREF(error); } // Runs necessary closures upon completion of a call attempt. static void run_closures_for_completed_call(subchannel_batch_data* batch_data, grpc_error* error) { grpc_call_element* elem = batch_data->elem; call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. add_closure_for_recv_trailing_metadata_ready( elem, batch_data, GRPC_ERROR_REF(error), &closures); // If there are deferred recv_initial_metadata_ready or recv_message_ready // callbacks, add them to closures. add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); // Add closures to fail any pending batches that have not yet been started. add_closures_to_fail_unstarted_pending_batches( elem, retry_state, GRPC_ERROR_REF(error), &closures); // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This will release the call combiner. closures.RunClosures(calld->call_combiner); GRPC_ERROR_UNREF(error); } // Intercepts recv_trailing_metadata_ready callback for retries. // Commits the call and returns the trailing metadata up the stack. static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", chand, calld, grpc_error_string(error)); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); retry_state->completed_recv_trailing_metadata = true; // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; grpc_mdelem* server_pushback_md = nullptr; grpc_metadata_batch* md_batch = batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, &server_pushback_md); grpc_core::channelz::SubchannelNode* channelz_subchannel = calld->pick.connected_subchannel->channelz_subchannel(); if (channelz_subchannel != nullptr) { if (status == GRPC_STATUS_OK) { channelz_subchannel->RecordCallSucceeded(); } else { channelz_subchannel->RecordCallFailed(); } } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, calld, grpc_status_code_to_string(status)); } // Check if we should retry. if (maybe_retry(elem, batch_data, status, server_pushback_md)) { // Unref batch_data for deferred recv_initial_metadata_ready or // recv_message_ready callbacks, if any. if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { batch_data_unref(batch_data); GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); } if (retry_state->recv_message_ready_deferred_batch != nullptr) { batch_data_unref(batch_data); GRPC_ERROR_UNREF(retry_state->recv_message_error); } batch_data_unref(batch_data); return; } // Not retrying, so commit the call. retry_commit(elem, retry_state); // Run any necessary closures. run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); } // // on_complete callback handling // // Adds the on_complete closure for the pending batch completed in // batch_data to closures. static void add_closure_for_completed_pending_batch( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_error* error, grpc_core::CallCombinerClosureList* closures) { pending_batch* pending = pending_batch_find( elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { // Match the pending batch with the same set of send ops as the // subchannel batch we've just completed. return batch->on_complete != nullptr && batch_data->batch.send_initial_metadata == batch->send_initial_metadata && batch_data->batch.send_message == batch->send_message && batch_data->batch.send_trailing_metadata == batch->send_trailing_metadata; }); // If batch_data is a replay batch, then there will be no pending // batch to complete. if (pending == nullptr) { GRPC_ERROR_UNREF(error); return; } // Add closure. closures->Add(pending->batch->on_complete, error, "on_complete for pending batch"); pending->batch->on_complete = nullptr; maybe_clear_pending_batch(elem, pending); } // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke // start_retriable_subchannel_batches(). static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); bool have_pending_send_message_ops = retry_state->started_send_message_count < calld->send_messages->size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; if (!have_pending_send_message_ops && !have_pending_send_trailing_metadata_op) { for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch == nullptr || pending->send_ops_cached) continue; if (batch->send_message) have_pending_send_message_ops = true; if (batch->send_trailing_metadata) { have_pending_send_trailing_metadata_op = true; } } } if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, "starting next batch for send_* op(s)"); } } // Callback used to intercept on_complete from subchannel calls. // Called only when retries are enabled. static void on_complete(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); grpc_call_element* elem = batch_data->elem; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", chand, calld, grpc_error_string(error), batch_str); gpr_free(batch_str); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); // Update bookkeeping in retry_state. if (batch_data->batch.send_initial_metadata) { retry_state->completed_send_initial_metadata = true; } if (batch_data->batch.send_message) { ++retry_state->completed_send_message_count; } if (batch_data->batch.send_trailing_metadata) { retry_state->completed_send_trailing_metadata = true; } // If the call is committed, free cached data for send ops that we've just // completed. if (calld->retry_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // If a retry was already dispatched, that means we saw // recv_trailing_metadata before this, so we do nothing here. // Otherwise, invoke the callback to return the result to the surface. if (!retry_state->retry_dispatched) { // Add closure for the completed pending batch, if any. add_closure_for_completed_pending_batch(elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures); // If needed, add a callback to start any replay or pending send ops on // the subchannel call. if (!retry_state->completed_recv_trailing_metadata) { add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, &closures); } } // Track number of pending subchannel send batches and determine if this // was the last one. --calld->num_pending_retriable_subchannel_send_batches; const bool last_send_batch_complete = calld->num_pending_retriable_subchannel_send_batches == 0; // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. closures.RunClosures(calld->call_combiner); // If this was the last subchannel send batch, unref the call stack. if (last_send_batch_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } // // subchannel batch construction // // Helper function used to start a subchannel batch in the call combiner. static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); grpc_subchannel_call* subchannel_call = static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. grpc_subchannel_call_process_op(subchannel_call, batch); } // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch, grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, calld, batch_str); gpr_free(batch_str); } closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. static void add_retriable_send_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { // Maps the number of retries to the corresponding metadata value slice. static const grpc_slice* retry_count_strings[] = { &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. // // If we've already completed one or more attempts, add the // grpc-retry-attempts header. retry_state->send_initial_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * (calld->send_initial_metadata.list.count + (calld->num_attempts_completed > 0)))); grpc_metadata_batch_copy(&calld->send_initial_metadata, &retry_state->send_initial_metadata, retry_state->send_initial_metadata_storage); if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts != nullptr)) { grpc_metadata_batch_remove(&retry_state->send_initial_metadata, retry_state->send_initial_metadata.idx.named .grpc_previous_rpc_attempts); } if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { grpc_mdelem retry_md = grpc_mdelem_from_slices( GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, *retry_count_strings[calld->num_attempts_completed - 1]); grpc_error* error = grpc_metadata_batch_add_tail( &retry_state->send_initial_metadata, &retry_state->send_initial_metadata_storage[calld->send_initial_metadata .list.count], retry_md); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { gpr_log(GPR_ERROR, "error adding retry metadata: %s", grpc_error_string(error)); GPR_ASSERT(false); } } retry_state->started_send_initial_metadata = true; batch_data->batch.send_initial_metadata = true; batch_data->batch.payload->send_initial_metadata.send_initial_metadata = &retry_state->send_initial_metadata; batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = calld->send_initial_metadata_flags; batch_data->batch.payload->send_initial_metadata.peer_string = calld->peer_string; } // Adds retriable send_message op to batch_data. static void add_retriable_send_message_op( grpc_call_element* elem, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } grpc_core::ByteStreamCache* cache = (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; retry_state->send_message.Init(cache); batch_data->batch.send_message = true; batch_data->batch.payload->send_message.send_message.reset( retry_state->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. static void add_retriable_send_trailing_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { // We need to make a copy of the metadata batch for each attempt, since // the filters in the subchannel stack may modify this batch, and we don't // want those modifications to be passed forward to subsequent attempts. retry_state->send_trailing_metadata_storage = static_cast(gpr_arena_alloc( calld->arena, sizeof(grpc_linked_mdelem) * calld->send_trailing_metadata.list.count)); grpc_metadata_batch_copy(&calld->send_trailing_metadata, &retry_state->send_trailing_metadata, retry_state->send_trailing_metadata_storage); retry_state->started_send_trailing_metadata = true; batch_data->batch.send_trailing_metadata = true; batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = &retry_state->send_trailing_metadata; } // Adds retriable recv_initial_metadata op to batch_data. static void add_retriable_recv_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { retry_state->started_recv_initial_metadata = true; batch_data->batch.recv_initial_metadata = true; grpc_metadata_batch_init(&retry_state->recv_initial_metadata); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = &retry_state->recv_initial_metadata; batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = &retry_state->trailing_metadata_available; GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, recv_initial_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = &retry_state->recv_initial_metadata_ready; } // Adds retriable recv_message op to batch_data. static void add_retriable_recv_message_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { ++retry_state->started_recv_message_count; batch_data->batch.recv_message = true; batch_data->batch.payload->recv_message.recv_message = &retry_state->recv_message; GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_message.recv_message_ready = &retry_state->recv_message_ready; } // Adds retriable recv_trailing_metadata op to batch_data. static void add_retriable_recv_trailing_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, subchannel_batch_data* batch_data) { retry_state->started_recv_trailing_metadata = true; batch_data->batch.recv_trailing_metadata = true; grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &retry_state->recv_trailing_metadata; batch_data->batch.payload->recv_trailing_metadata.collect_stats = &retry_state->collect_stats; GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, recv_trailing_metadata_ready, batch_data, grpc_schedule_on_exec_ctx); batch_data->batch.payload->recv_trailing_metadata .recv_trailing_metadata_ready = &retry_state->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This // is used in the case where a recv_initial_metadata or recv_message // op fails in a way that we know the call is over but when the application // has not yet started its own recv_trailing_metadata op. static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: call failed but recv_trailing_metadata not " "started; starting it internally", chand, calld); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: // once for the recv_trailing_metadata_ready callback when the subchannel // batch returns, and again when we actually get a recv_trailing_metadata // op from the surface. subchannel_batch_data* batch_data = batch_data_create(elem, 2, false /* set_on_complete */); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); } // If there are any cached send ops that need to be replayed on the // current subchannel call, creates and returns a new subchannel batch // to replay those ops. Otherwise, returns nullptr. static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( grpc_call_element* elem, subchannel_call_retry_state* retry_state) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); subchannel_batch_data* replay_batch_data = nullptr; // send_initial_metadata. if (calld->seen_send_initial_metadata && !retry_state->started_send_initial_metadata && !calld->pending_send_initial_metadata) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_initial_metadata op", chand, calld); } replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } // send_message. // Note that we can only have one send_message op in flight at a time. if (retry_state->started_send_message_count < calld->send_messages->size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_message op", chand, calld); } if (replay_batch_data == nullptr) { replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } // send_trailing_metadata. // Note that we only add this op if we have no more send_message ops // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && retry_state->started_send_message_count == calld->send_messages->size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: replaying previously completed " "send_trailing_metadata op", chand, calld); } if (replay_batch_data == nullptr) { replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); } return replay_batch_data; } // Adds subchannel batches for pending batches to batches, updating // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, grpc_core::CallCombinerClosureList* closures) { call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch == nullptr) continue; // Skip any batch that either (a) has already been started on this // subchannel call or (b) we can't start yet because we're still // replaying send ops that need to be completed first. // TODO(roth): Note that if any one op in the batch can't be sent // yet due to ops that we're replaying, we don't start any of the ops // in the batch. This is probably okay, but it could conceivably // lead to increased latency in some cases -- e.g., we could delay // starting a recv op due to it being in the same batch with a send // op. If/when we revamp the callback protocol in // transport_stream_op_batch, we may be able to fix this. if (batch->send_initial_metadata && retry_state->started_send_initial_metadata) { continue; } if (batch->send_message && retry_state->completed_send_message_count < retry_state->started_send_message_count) { continue; } // Note that we only start send_trailing_metadata if we have no more // send_message ops to start, since we can't send down any more // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < calld->send_messages->size() || retry_state->started_send_trailing_metadata)) { continue; } if (batch->recv_initial_metadata && retry_state->started_recv_initial_metadata) { continue; } if (batch->recv_message && retry_state->completed_recv_message_count < retry_state->started_recv_message_count) { continue; } if (batch->recv_trailing_metadata && retry_state->started_recv_trailing_metadata) { // If we previously completed a recv_trailing_metadata op // initiated by start_internal_recv_trailing_metadata(), use the // result of that instead of trying to re-start this op. if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != nullptr))) { // If the batch completed, then trigger the completion callback // directly, so that we return the previously returned results to // the application. Otherwise, just unref the internally // started subchannel batch, since we'll propagate the // completion when it completes. if (retry_state->completed_recv_trailing_metadata) { // Batches containing recv_trailing_metadata always succeed. closures->Add( &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, "re-executing recv_trailing_metadata_ready to propagate " "internally triggered result"); } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } retry_state->recv_trailing_metadata_internal_batch = nullptr; } continue; } // If we're not retrying, just send the batch as-is. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { add_closure_for_subchannel_batch(elem, batch, closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. const bool has_send_ops = batch->send_initial_metadata || batch->send_message || batch->send_trailing_metadata; const int num_callbacks = has_send_ops + batch->recv_initial_metadata + batch->recv_message + batch->recv_trailing_metadata; subchannel_batch_data* batch_data = batch_data_create( elem, num_callbacks, has_send_ops /* set_on_complete */); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. if (batch->send_initial_metadata) { add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); } // send_message. if (batch->send_message) { add_retriable_send_message_op(elem, retry_state, batch_data); } // send_trailing_metadata. if (batch->send_trailing_metadata) { add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); } // recv_initial_metadata. if (batch->recv_initial_metadata) { // recv_flags is only used on the server side. GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); } // recv_message. if (batch->recv_message) { add_retriable_recv_message_op(calld, retry_state, batch_data); } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || batch->send_trailing_metadata) { if (calld->num_pending_retriable_subchannel_send_batches == 0) { GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); } ++calld->num_pending_retriable_subchannel_send_batches; } } } // Constructs and starts whatever subchannel batches are needed on the // subchannel call. static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", chand, calld); } subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, &closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); } ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", chand, calld, closures.size(), calld->subchannel_call); } // Note: This will yield the call combiner. closures.RunClosures(calld->call_combiner); } // // LB pick // static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); const size_t parent_data_size = calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time calld->deadline, // deadline calld->arena, // arena calld->pick.subchannel_call_context, // context calld->call_combiner, // call_combiner parent_data_size // parent_data_size }; grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); } if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { grpc_core::channelz::SubchannelNode* channelz_subchannel = calld->pick.connected_subchannel->channelz_subchannel(); if (channelz_subchannel != nullptr) { channelz_subchannel->RecordCallStarted(); } if (parent_data_size > 0) { subchannel_call_retry_state* retry_state = static_cast( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); retry_state->batch_payload.context = calld->pick.subchannel_call_context; } pending_batches_resume(elem); } GRPC_ERROR_UNREF(error); } // Invoked when a pick is completed, on both success or failure. static void pick_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { // Failed to create subchannel. // If there was no error, this is an LB policy drop, in which case // we return an error; otherwise, we may retry. grpc_status_code status = GRPC_STATUS_OK; grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, nullptr); if (error == GRPC_ERROR_NONE || !calld->enable_retries || !maybe_retry(elem, nullptr /* batch_data */, status, nullptr /* server_pushback_md */)) { grpc_error* new_error = error == GRPC_ERROR_NONE ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failed to create subchannel: error=%s", chand, calld, grpc_error_string(new_error)); } pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } } else { /* Create call on subchannel. */ create_subchannel_call(elem, GRPC_ERROR_REF(error)); } } static void maybe_add_call_to_channel_interested_parties_locked( grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (!calld->pollent_added_to_interested_parties) { calld->pollent_added_to_interested_parties = true; grpc_polling_entity_add_to_pollset_set(calld->pollent, chand->interested_parties); } } static void maybe_del_call_from_channel_interested_parties_locked( grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (calld->pollent_added_to_interested_parties) { calld->pollent_added_to_interested_parties = false; grpc_polling_entity_del_from_pollset_set(calld->pollent, chand->interested_parties); } } // Invoked when a pick is completed to leave the client_channel combiner // and continue processing in the call combiner. // If needed, removes the call's polling entity from chand->interested_parties. static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = static_cast(elem->call_data); maybe_del_call_from_channel_interested_parties_locked(elem); GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(&calld->pick_closure, error); } namespace grpc_core { // Performs subchannel pick via LB policy. class LbPicker { public: // Starts a pick on chand->lb_policy. static void StartLocked(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy.get()); } // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the // list, as set by get_batch_index() above. calld->pick.initial_metadata = calld->seen_send_initial_metadata ? &calld->send_initial_metadata : calld->pending_batches[0] .batch->payload->send_initial_metadata.send_initial_metadata; calld->pick.initial_metadata_flags = calld->seen_send_initial_metadata ? calld->send_initial_metadata_flags : calld->pending_batches[0] .batch->payload->send_initial_metadata .send_initial_metadata_flags; GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, grpc_combiner_scheduler(chand->combiner)); calld->pick.on_complete = &calld->pick_closure; GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); grpc_error* error = GRPC_ERROR_NONE; const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); if (GPR_LIKELY(pick_done)) { // Pick completed synchronously. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", chand, calld); } pick_done_locked(elem, error); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } else { // Pick will be returned asynchronously. // Add the polling entity from call_data to the channel_data's // interested_parties, so that the I/O of the LB policy can be done // under it. It will be removed in pick_done_locked(). maybe_add_call_to_channel_interested_parties_locked(elem); // Request notification on call cancellation. GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( calld->call_combiner, GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, &LbPicker::CancelLocked, elem, grpc_combiner_scheduler(chand->combiner))); } } private: // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. // Unrefs the LB policy and invokes pick_done_locked(). static void DoneLocked(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand, calld); } pick_done_locked(elem, GRPC_ERROR_REF(error)); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. static void CancelLocked(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); // Note: chand->lb_policy may have changed since we started our pick, // in which case we will be cancelling the pick on a policy other than // the one we started it on. However, this will just be a no-op. if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", chand, calld, chand->lb_policy.get()); } chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } }; } // namespace grpc_core // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. static void apply_service_config_to_call_locked(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", chand, calld); } if (chand->retry_throttle_data != nullptr) { calld->retry_throttle_data = chand->retry_throttle_data->Ref(); } if (chand->method_params_table != nullptr) { calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( *chand->method_params_table, calld->path); if (calld->method_params != nullptr) { // If the deadline from the service config is shorter than the one // from the client API, reset the deadline timer. if (chand->deadline_checking_enabled && calld->method_params->timeout() != 0) { const grpc_millis per_method_deadline = grpc_timespec_to_millis_round_up(calld->call_start_time) + calld->method_params->timeout(); if (per_method_deadline < calld->deadline) { calld->deadline = per_method_deadline; grpc_deadline_state_reset(elem, calld->deadline); } } // If the service config set wait_for_ready and the application // did not explicitly set it, use the value from the service config. uint32_t* send_initial_metadata_flags = &calld->pending_batches[0] .batch->payload->send_initial_metadata .send_initial_metadata_flags; if (GPR_UNLIKELY( calld->method_params->wait_for_ready() != ClientChannelMethodParams::WAIT_FOR_READY_UNSET && !(*send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { if (calld->method_params->wait_for_ready() == ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } } } // If no retry policy, disable retries. // TODO(roth): Remove this when adding support for transparent retries. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr) { calld->enable_retries = false; } } // Invoked once resolver results are available. static void process_service_config_and_start_lb_pick_locked( grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); } // Start LB pick. grpc_core::LbPicker::StartLocked(elem); } namespace grpc_core { // Handles waiting for a resolver result. // Used only for the first call on an idle channel. class ResolverResultWaiter { public: explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } // Add closure to be run when a resolver result is available. GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, grpc_combiner_scheduler(chand->combiner)); AddToWaitingList(); // Set cancellation closure, so that we abort if the call is cancelled. GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, this, grpc_combiner_scheduler(chand->combiner)); grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &cancel_closure_); } private: // Adds closure_ to chand->waiting_for_resolver_result_closures. void AddToWaitingList() { channel_data* chand = static_cast(elem_->channel_data); grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, &done_closure_, GRPC_ERROR_NONE); } // Invoked when a resolver result is available. static void DoneLocked(void* arg, grpc_error* error) { ResolverResultWaiter* self = static_cast(arg); // If CancelLocked() has already run, delete ourselves without doing // anything. Note that the call stack may have already been destroyed, // so it's not safe to access anything in elem_. if (GPR_UNLIKELY(self->finished_)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "call cancelled before resolver result"); } Delete(self); return; } // Otherwise, process the resolver result. grpc_call_element* elem = self->elem_; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", chand, calld); } pick_done_locked(elem, GRPC_ERROR_REF(error)); } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { // Shutting down. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, calld); } pick_done_locked(elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { // Transient resolver failure. // If call has wait_for_ready=true, try again; otherwise, fail. uint32_t send_initial_metadata_flags = calld->seen_send_initial_metadata ? calld->send_initial_metadata_flags : calld->pending_batches[0] .batch->payload->send_initial_metadata .send_initial_metadata_flags; if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned but no LB policy; " "wait_for_ready=true; trying again", chand, calld); } // Re-add ourselves to the waiting list. self->AddToWaitingList(); // Return early so that we don't set finished_ to true below. return; } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned but no LB policy; " "wait_for_ready=false; failing", chand, calld); } pick_done_locked( elem, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", chand, calld); } process_service_config_and_start_lb_pick_locked(elem); } self->finished_ = true; } // Invoked when the call is cancelled. // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. static void CancelLocked(void* arg, grpc_error* error) { ResolverResultWaiter* self = static_cast(arg); // If DoneLocked() has already run, delete ourselves without doing anything. if (GPR_LIKELY(self->finished_)) { Delete(self); return; } // If we are being cancelled, immediately invoke pick_done_locked() // to propagate the error back to the caller. if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { grpc_call_element* elem = self->elem_; channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling call waiting for name " "resolution", chand, calld); } // Note: Although we are not in the call combiner here, we are // basically stealing the call combiner from the pending pick, so // it's safe to call pick_done_locked() here -- we are essentially // calling it here instead of calling it in DoneLocked(). pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); } self->finished_ = true; } grpc_call_element* elem_; grpc_closure done_closure_; grpc_closure cancel_closure_; bool finished_ = false; }; } // namespace grpc_core static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); GPR_ASSERT(calld->pick.connected_subchannel == nullptr); GPR_ASSERT(calld->subchannel_call == nullptr); if (GPR_LIKELY(chand->lb_policy != nullptr)) { // We already have resolver results, so process the service config // and start an LB pick. process_service_config_and_start_lb_pick_locked(elem); } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { pick_done_locked(elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else { // We do not yet have an LB policy, so wait for a resolver result. if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); } // Create a new waiter, which will delete itself when done. grpc_core::New(elem); // Add the polling entity from call_data to the channel_data's // interested_parties, so that the I/O of the resolver can be done // under it. It will be removed in pick_done_locked(). maybe_add_call_to_channel_interested_parties_locked(elem); } } // // filter call vtable functions // static void cc_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); if (GPR_LIKELY(chand->deadline_checking_enabled)) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } // If we've previously been cancelled, immediately fail any new batches. if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", chand, calld, grpc_error_string(calld->cancel_error)); } // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); return; } // Handle cancellation. if (GPR_UNLIKELY(batch->cancel_stream)) { // Stash a copy of cancel_error in our call data, so that we can use // it for subsequent operations. This ensures that if the call is // cancelled before any batches are passed down (e.g., if the deadline // is in the past when the call starts), we can return the right // error to the caller when the first batch does get passed down. GRPC_ERROR_UNREF(calld->cancel_error); calld->cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, calld, grpc_error_string(calld->cancel_error)); } // If we do not have a subchannel call (i.e., a pick has not yet // been started), fail all pending batches. Otherwise, send the // cancellation down to the subchannel call. if (calld->subchannel_call == nullptr) { pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), false /* yield_call_combiner */); // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); } else { // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, batch); } return; } // Add the batch to the pending list. pending_batches_add(elem, batch); // Check if we've already gotten a subchannel call. // Note that once we have completed the pick, we do not need to enter // the channel combiner, which is more efficient (especially for // streaming calls). if (calld->subchannel_call != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, calld, calld->subchannel_call); } pending_batches_resume(elem); return; } // We do not yet have a subchannel call. // For batches containing a send_initial_metadata op, enter the channel // combiner to start a pick. if (GPR_LIKELY(batch->send_initial_metadata)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", chand, calld); } GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, elem, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } else { // For all other batches, release the call combiner. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: saved batch, yeilding call combiner", chand, calld); } GRPC_CALL_COMBINER_STOP(calld->call_combiner, "batch does not include send_initial_metadata"); } } /* Constructor for call_data */ static grpc_error* cc_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); // Initialize data members. calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = args->deadline; calld->arena = args->arena; calld->owning_call = args->call_stack; calld->call_combiner = args->call_combiner; if (GPR_LIKELY(chand->deadline_checking_enabled)) { grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, calld->deadline); } calld->enable_retries = chand->enable_retries; calld->send_messages.Init(); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void cc_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); if (GPR_LIKELY(chand->deadline_checking_enabled)) { grpc_deadline_state_destroy(elem); } grpc_slice_unref_internal(calld->path); calld->retry_throttle_data.reset(); calld->method_params.reset(); GRPC_ERROR_UNREF(calld->cancel_error); if (GPR_LIKELY(calld->subchannel_call != nullptr)) { grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, then_schedule_closure); then_schedule_closure = nullptr; GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { GPR_ASSERT(calld->pending_batches[i].batch == nullptr); } if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) { calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (calld->pick.subchannel_call_context[i].value != nullptr) { calld->pick.subchannel_call_context[i].destroy( calld->pick.subchannel_call_context[i].value); } } calld->send_messages.Destroy(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = static_cast(elem->call_data); calld->pollent = pollent; } /************************************************************************* * EXPORTED SYMBOLS */ const grpc_channel_filter grpc_client_channel_filter = { cc_start_transport_stream_op_batch, cc_start_transport_op, sizeof(call_data), cc_init_call_elem, cc_set_pollset_or_pollset_set, cc_destroy_call_elem, sizeof(channel_data), cc_init_channel_elem, cc_destroy_channel_elem, cc_get_channel_info, "client-channel", }; static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast(arg); if (chand->lb_policy != nullptr) { chand->lb_policy->ExitIdleLocked(); } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != nullptr) { start_resolving_locked(chand); } } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); } void grpc_client_channel_populate_child_refs( grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels, grpc_core::ChildRefsList* child_channels) { channel_data* chand = static_cast(elem->channel_data); if (chand->lb_policy != nullptr) { chand->lb_policy->FillChildRefsForChannelz(child_subchannels, child_channels); } } grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast(elem->channel_data); grpc_connectivity_state out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; } typedef struct external_connectivity_watcher { channel_data* chand; grpc_polling_entity pollent; grpc_closure* on_complete; grpc_closure* watcher_timer_init; grpc_connectivity_state* state; grpc_closure my_closure; struct external_connectivity_watcher* next; } external_connectivity_watcher; static external_connectivity_watcher* lookup_external_connectivity_watcher( channel_data* chand, grpc_closure* on_complete) { gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr && w->on_complete != on_complete) { w = w->next; } gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return w; } static void external_connectivity_watcher_list_append( channel_data* chand, external_connectivity_watcher* w) { GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); GPR_ASSERT(!w->next); w->next = chand->external_connectivity_watcher_list_head; chand->external_connectivity_watcher_list_head = w; gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); } static void external_connectivity_watcher_list_remove( channel_data* chand, external_connectivity_watcher* too_remove) { GPR_ASSERT( lookup_external_connectivity_watcher(chand, too_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); if (too_remove == chand->external_connectivity_watcher_list_head) { chand->external_connectivity_watcher_list_head = too_remove->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { if (w->next == too_remove) { w->next = w->next->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } w = w->next; } GPR_UNREACHABLE_CODE(return ); } int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); int count = 0; gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { count++; w = w->next; } gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return count; } static void on_external_watch_complete_locked(void* arg, grpc_error* error) { external_connectivity_watcher* w = static_cast(arg); grpc_closure* follow_up = w->on_complete; grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(void* arg, grpc_error* error_ignored) { external_connectivity_watcher* w = static_cast(arg); external_connectivity_watcher* found = nullptr; if (w->state != nullptr) { external_connectivity_watcher_list_append(w->chand, w); // An assumption is being made that the closure is scheduled on the exec ctx // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, w->state, &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); grpc_connectivity_state_notify_on_state_change( &found->chand->state_tracker, nullptr, &found->my_closure); } grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); } } void grpc_client_channel_watch_connectivity_state( grpc_channel_element* elem, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* closure, grpc_closure* watcher_timer_init) { channel_data* chand = static_cast(elem->channel_data); external_connectivity_watcher* w = static_cast(gpr_zalloc(sizeof(*w))); w->chand = chand; w->pollent = pollent; w->on_complete = closure; w->state = state; w->watcher_timer_init = watcher_timer_init; grpc_polling_entity_add_to_pollset_set(&w->pollent, chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } grpc_subchannel_call* grpc_client_channel_get_subchannel_call( grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); return calld->subchannel_call; }