aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-12-20 16:16:39 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2018-12-20 16:16:39 -0800
commit0c7eb8daf8414a8efcb742d9af59d8e38c1b1b22 (patch)
tree0517062d2fab7a01be0a53f1999403e87e80d8a4 /src
parentb09ed93d02197235471e6e65df2df2cbeb506f50 (diff)
parent102d5d88a626e29f1dccc4ffdb977d1bcd7a5937 (diff)
Merge branch 'master' into fathom
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc1035
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h13
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc83
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc8
-rw-r--r--src/core/ext/filters/client_channel/request_routing.cc936
-rw-r--r--src/core/ext/filters/client_channel/request_routing.h177
-rw-r--r--src/core/ext/filters/client_channel/resolver_result_parsing.cc14
-rw-r--r--src/core/ext/filters/client_channel/resolver_result_parsing.h30
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc29
-rw-r--r--src/core/lib/security/credentials/composite/composite_credentials.cc61
-rw-r--r--src/core/lib/security/credentials/composite/composite_credentials.h43
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.cc2
-rw-r--r--src/core/tsi/ssl_transport_security.cc12
-rw-r--r--src/cpp/ext/filters/census/context.cc6
-rw-r--r--src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets3
-rw-r--r--src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets3
-rw-r--r--src/objective-c/README.md9
-rw-r--r--src/python/grpcio/grpc/_auth.py2
-rw-r--r--src/python/grpcio/grpc/_channel.py14
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi11
-rw-r--r--src/python/grpcio/grpc/_server.py106
-rw-r--r--src/python/grpcio/grpc/_utilities.py3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio_status/grpc_status/rpc_status.py2
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_handler.py2
-rw-r--r--src/python/grpcio_tests/commands.py1
-rw-r--r--src/python/grpcio_tests/setup.py12
-rw-r--r--src/python/grpcio_tests/tests/_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py3
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py8
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py2
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py2
-rw-r--r--src/python/grpcio_tests/tests/qps/client_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py2
-rw-r--r--src/python/grpcio_tests/tests/stress/client.py4
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_application.py4
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/BUILD.bazel7
-rw-r--r--src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py97
-rw-r--r--src/python/grpcio_tests/tests/unit/_server_shutdown_test.py90
44 files changed, 1736 insertions, 1126 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 70aac47231..dd741f1e2d 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -35,10 +35,10 @@
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/request_routing.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
-#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/backoff/backoff.h"
@@ -63,7 +63,6 @@
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
-using grpc_core::ServerAddressList;
using grpc_core::internal::ClientChannelMethodParams;
using grpc_core::internal::ClientChannelMethodParamsTable;
using grpc_core::internal::ProcessedResolverResult;
@@ -88,31 +87,18 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
struct external_connectivity_watcher;
typedef struct client_channel_channel_data {
- grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
- bool started_resolving;
+ grpc_core::ManualConstructor<grpc_core::RequestRouter> request_router;
+
bool deadline_checking_enabled;
- grpc_client_channel_factory* client_channel_factory;
bool enable_retries;
size_t per_rpc_retry_buffer_size;
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
- /** currently active load balancer */
- grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
/** retry throttle data */
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** maps method names to method_parameters structs */
grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
- /** incoming resolver result - set by resolver.next() */
- grpc_channel_args* resolver_result;
- /** a list of closures that are all waiting for resolver result to come in */
- grpc_closure_list waiting_for_resolver_result_closures;
- /** resolver callback */
- grpc_closure on_resolver_result_changed;
- /** connectivity state being tracked */
- grpc_connectivity_state_tracker state_tracker;
- /** when an lb_policy arrives, should we try to exit idle */
- bool exit_idle_when_lb_policy_arrives;
/** owning stack */
grpc_channel_stack* owning_stack;
/** interested parties (owned) */
@@ -129,418 +115,40 @@ typedef struct client_channel_channel_data {
grpc_core::UniquePtr<char> info_lb_policy_name;
/** service config in JSON form */
grpc_core::UniquePtr<char> info_service_config_json;
- /* backpointer to grpc_channel's channelz node */
- grpc_core::channelz::ClientChannelNode* channelz_channel;
- /* caches if the last resolution event contained addresses */
- bool previous_resolution_contained_addresses;
} channel_data;
-typedef struct {
- channel_data* chand;
- /** used as an identifier, don't dereference it because the LB policy may be
- * non-existing when the callback is run */
- grpc_core::LoadBalancingPolicy* lb_policy;
- grpc_closure closure;
-} reresolution_request_args;
-
-/** We create one watcher for each new lb_policy that is returned from a
- resolver, to watch for state changes from the lb_policy. When a state
- change is seen, we update the channel, and create a new watcher. */
-typedef struct {
- channel_data* chand;
- grpc_closure on_changed;
- grpc_connectivity_state state;
- grpc_core::LoadBalancingPolicy* lb_policy;
-} lb_policy_connectivity_watcher;
-
-static void watch_lb_policy_locked(channel_data* chand,
- grpc_core::LoadBalancingPolicy* lb_policy,
- grpc_connectivity_state current_state);
-
-static const char* channel_connectivity_state_change_string(
- grpc_connectivity_state state) {
- switch (state) {
- case GRPC_CHANNEL_IDLE:
- return "Channel state change to IDLE";
- case GRPC_CHANNEL_CONNECTING:
- return "Channel state change to CONNECTING";
- case GRPC_CHANNEL_READY:
- return "Channel state change to READY";
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- return "Channel state change to TRANSIENT_FAILURE";
- case GRPC_CHANNEL_SHUTDOWN:
- return "Channel state change to SHUTDOWN";
- }
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
-static void set_channel_connectivity_state_locked(channel_data* chand,
- grpc_connectivity_state state,
- grpc_error* error,
- const char* reason) {
- /* TODO: Improve failure handling:
- * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
- * - Hand over pending picks from old policies during the switch that happens
- * when resolver provides an update. */
- if (chand->lb_policy != nullptr) {
- if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* cancel picks with wait_for_ready=false */
- chand->lb_policy->CancelMatchingPicksLocked(
- /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
- /* check= */ 0, GRPC_ERROR_REF(error));
- } else if (state == GRPC_CHANNEL_SHUTDOWN) {
- /* cancel all picks */
- chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
- GRPC_ERROR_REF(error));
- }
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
- grpc_connectivity_state_name(state));
- }
- if (chand->channelz_channel != nullptr) {
- chand->channelz_channel->AddTraceEvent(
- grpc_core::channelz::ChannelTrace::Severity::Info,
- grpc_slice_from_static_string(
- channel_connectivity_state_change_string(state)));
- }
- grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
-}
-
-static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
- lb_policy_connectivity_watcher* w =
- static_cast<lb_policy_connectivity_watcher*>(arg);
- /* check if the notification is for the latest policy */
- if (w->lb_policy == w->chand->lb_policy.get()) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
- w->lb_policy, grpc_connectivity_state_name(w->state));
- }
- set_channel_connectivity_state_locked(w->chand, w->state,
- GRPC_ERROR_REF(error), "lb_changed");
- if (w->state != GRPC_CHANNEL_SHUTDOWN) {
- watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
- }
- }
- GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
- gpr_free(w);
-}
-
-static void watch_lb_policy_locked(channel_data* chand,
- grpc_core::LoadBalancingPolicy* lb_policy,
- grpc_connectivity_state current_state) {
- lb_policy_connectivity_watcher* w =
- static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
- w->chand = chand;
- GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
- grpc_combiner_scheduler(chand->combiner));
- w->state = current_state;
- w->lb_policy = lb_policy;
- lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
-}
-
-static void start_resolving_locked(channel_data* chand) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
- }
- GPR_ASSERT(!chand->started_resolving);
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->resolver->NextLocked(&chand->resolver_result,
- &chand->on_resolver_result_changed);
-}
-
-// Invoked from the resolver NextLocked() callback when the resolver
-// is shutting down.
-static void on_resolver_shutdown_locked(channel_data* chand,
- grpc_error* error) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
- }
- if (chand->lb_policy != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
- chand->lb_policy.get());
- }
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- chand->lb_policy.reset();
- }
- if (chand->resolver != nullptr) {
- // This should never happen; it can only be triggered by a resolver
- // implementation spotaneously deciding to report shutdown without
- // being orphaned. This code is included just to be defensive.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
- chand, chand->resolver.get());
- }
- chand->resolver.reset();
- set_channel_connectivity_state_locked(
- chand, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Resolver spontaneous shutdown", &error, 1),
- "resolver_spontaneous_shutdown");
- }
- grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Channel disconnected", &error, 1));
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
- grpc_channel_args_destroy(chand->resolver_result);
- chand->resolver_result = nullptr;
- GRPC_ERROR_UNREF(error);
-}
-
-static void request_reresolution_locked(void* arg, grpc_error* error) {
- reresolution_request_args* args =
- static_cast<reresolution_request_args*>(arg);
- channel_data* chand = args->chand;
- // If this invocation is for a stale LB policy, treat it as an LB shutdown
- // signal.
- if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
- chand->resolver == nullptr) {
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
- gpr_free(args);
- return;
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
- }
- chand->resolver->RequestReresolutionLocked();
- // Give back the closure to the LB policy.
- chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
-}
-
-using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
-
-// Creates a new LB policy, replacing any previous one.
-// If the new policy is created successfully, sets *connectivity_state and
-// *connectivity_error to its initial connectivity state; otherwise,
-// leaves them unchanged.
-static void create_new_lb_policy_locked(
- channel_data* chand, char* lb_policy_name, grpc_json* lb_config,
- grpc_connectivity_state* connectivity_state,
- grpc_error** connectivity_error, TraceStringVector* trace_strings) {
- grpc_core::LoadBalancingPolicy::Args lb_policy_args;
- lb_policy_args.combiner = chand->combiner;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.lb_config = lb_config;
- grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
- grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
- lb_policy_name, lb_policy_args);
- if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
- gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
- if (chand->channelz_channel != nullptr) {
- char* str;
- gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name);
- trace_strings->push_back(str);
- }
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
- lb_policy_name, new_lb_policy.get());
- }
- if (chand->channelz_channel != nullptr) {
- char* str;
- gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name);
- trace_strings->push_back(str);
- }
- // Swap out the LB policy and update the fds in
- // chand->interested_parties.
- if (chand->lb_policy != nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
- chand->lb_policy.get());
- }
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
- }
- chand->lb_policy = std::move(new_lb_policy);
- grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- // Set up re-resolution callback.
- reresolution_request_args* args =
- static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
- args->chand = chand;
- args->lb_policy = chand->lb_policy.get();
- GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
- grpc_combiner_scheduler(chand->combiner));
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
- chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
- // Get the new LB policy's initial connectivity state and start a
- // connectivity watch.
- GRPC_ERROR_UNREF(*connectivity_error);
- *connectivity_state =
- chand->lb_policy->CheckConnectivityLocked(connectivity_error);
- if (chand->exit_idle_when_lb_policy_arrives) {
- chand->lb_policy->ExitIdleLocked();
- chand->exit_idle_when_lb_policy_arrives = false;
- }
- watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
- }
-}
-
-static void maybe_add_trace_message_for_address_changes_locked(
- channel_data* chand, TraceStringVector* trace_strings) {
- const ServerAddressList* addresses =
- grpc_core::FindServerAddressListChannelArg(chand->resolver_result);
- const bool resolution_contains_addresses =
- addresses != nullptr && addresses->size() > 0;
- if (!resolution_contains_addresses &&
- chand->previous_resolution_contained_addresses) {
- trace_strings->push_back(gpr_strdup("Address list became empty"));
- } else if (resolution_contains_addresses &&
- !chand->previous_resolution_contained_addresses) {
- trace_strings->push_back(gpr_strdup("Address list became non-empty"));
- }
- chand->previous_resolution_contained_addresses =
- resolution_contains_addresses;
-}
-
-static void concatenate_and_add_channel_trace_locked(
- channel_data* chand, TraceStringVector* trace_strings) {
- if (!trace_strings->empty()) {
- gpr_strvec v;
- gpr_strvec_init(&v);
- gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
- bool is_first = 1;
- for (size_t i = 0; i < trace_strings->size(); ++i) {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
- is_first = false;
- gpr_strvec_add(&v, (*trace_strings)[i]);
- }
- char* flat;
- size_t flat_len = 0;
- flat = gpr_strvec_flatten(&v, &flat_len);
- chand->channelz_channel->AddTraceEvent(
- grpc_core::channelz::ChannelTrace::Severity::Info,
- grpc_slice_new(flat, flat_len, gpr_free));
- gpr_strvec_destroy(&v);
- }
-}
-
-// Callback invoked when a resolver result is available.
-static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
+// Synchronous callback from chand->request_router to process a resolver
+// result update.
+static bool process_resolver_result_locked(void* arg,
+ const grpc_channel_args& args,
+ const char** lb_policy_name,
+ grpc_json** lb_policy_config) {
channel_data* chand = static_cast<channel_data*>(arg);
+ ProcessedResolverResult resolver_result(args, chand->enable_retries);
+ grpc_core::UniquePtr<char> service_config_json =
+ resolver_result.service_config_json();
if (grpc_client_channel_trace.enabled()) {
- const char* disposition =
- chand->resolver_result != nullptr
- ? ""
- : (error == GRPC_ERROR_NONE ? " (transient error)"
- : " (resolver shutdown)");
- gpr_log(GPR_INFO,
- "chand=%p: got resolver result: resolver_result=%p error=%s%s",
- chand, chand->resolver_result, grpc_error_string(error),
- disposition);
+ gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
+ chand, service_config_json.get());
}
- // Handle shutdown.
- if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
- on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
- return;
- }
- // Data used to set the channel's connectivity state.
- bool set_connectivity_state = true;
- // We only want to trace the address resolution in the follow cases:
- // (a) Address resolution resulted in service config change.
- // (b) Address resolution that causes number of backends to go from
- // zero to non-zero.
- // (c) Address resolution that causes number of backends to go from
- // non-zero to zero.
- // (d) Address resolution that causes a new LB policy to be created.
- //
- // we track a list of strings to eventually be concatenated and traced.
- TraceStringVector trace_strings;
- grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_error* connectivity_error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
- // chand->resolver_result will be null in the case of a transient
- // resolution error. In that case, we don't have any new result to
- // process, which means that we keep using the previous result (if any).
- if (chand->resolver_result == nullptr) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
- }
- // Don't override connectivity state if we already have an LB policy.
- if (chand->lb_policy != nullptr) set_connectivity_state = false;
- } else {
- // Parse the resolver result.
- ProcessedResolverResult resolver_result(chand->resolver_result,
- chand->enable_retries);
- chand->retry_throttle_data = resolver_result.retry_throttle_data();
- chand->method_params_table = resolver_result.method_params_table();
- grpc_core::UniquePtr<char> service_config_json =
- resolver_result.service_config_json();
- if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
- chand, service_config_json.get());
- }
- grpc_core::UniquePtr<char> lb_policy_name =
- resolver_result.lb_policy_name();
- grpc_json* lb_policy_config = resolver_result.lb_policy_config();
- // Check to see if we're already using the right LB policy.
- // Note: It's safe to use chand->info_lb_policy_name here without
- // taking a lock on chand->info_mu, because this function is the
- // only thing that modifies its value, and it can only be invoked
- // once at any given time.
- bool lb_policy_name_changed =
- chand->info_lb_policy_name == nullptr ||
- strcmp(chand->info_lb_policy_name.get(), lb_policy_name.get()) != 0;
- if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
- // Continue using the same LB policy. Update with new addresses.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
- chand, lb_policy_name.get(), chand->lb_policy.get());
- }
- chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config);
- // No need to set the channel's connectivity state; the existing
- // watch on the LB policy will take care of that.
- set_connectivity_state = false;
- } else {
- // Instantiate new LB policy.
- create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config,
- &connectivity_state, &connectivity_error,
- &trace_strings);
- }
- // Note: It's safe to use chand->info_service_config_json here without
- // taking a lock on chand->info_mu, because this function is the
- // only thing that modifies its value, and it can only be invoked
- // once at any given time.
- if (chand->channelz_channel != nullptr) {
- if (((service_config_json == nullptr) !=
- (chand->info_service_config_json == nullptr)) ||
- (service_config_json != nullptr &&
- strcmp(service_config_json.get(),
- chand->info_service_config_json.get()) != 0)) {
- // TODO(ncteisen): might be worth somehow including a snippet of the
- // config in the trace, at the risk of bloating the trace logs.
- trace_strings.push_back(gpr_strdup("Service config changed"));
- }
- maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings);
- concatenate_and_add_channel_trace_locked(chand, &trace_strings);
- }
- // Swap out the data used by cc_get_channel_info().
- gpr_mu_lock(&chand->info_mu);
- chand->info_lb_policy_name = std::move(lb_policy_name);
- chand->info_service_config_json = std::move(service_config_json);
- gpr_mu_unlock(&chand->info_mu);
- // Clean up.
- grpc_channel_args_destroy(chand->resolver_result);
- chand->resolver_result = nullptr;
- }
- // Set the channel's connectivity state if needed.
- if (set_connectivity_state) {
- set_channel_connectivity_state_locked(
- chand, connectivity_state, connectivity_error, "resolver_result");
- } else {
- GRPC_ERROR_UNREF(connectivity_error);
- }
- // Invoke closures that were waiting for results and renew the watch.
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- chand->resolver->NextLocked(&chand->resolver_result,
- &chand->on_resolver_result_changed);
+ // Update channel state.
+ chand->retry_throttle_data = resolver_result.retry_throttle_data();
+ chand->method_params_table = resolver_result.method_params_table();
+ // Swap out the data used by cc_get_channel_info().
+ gpr_mu_lock(&chand->info_mu);
+ chand->info_lb_policy_name = resolver_result.lb_policy_name();
+ const bool service_config_changed =
+ ((service_config_json == nullptr) !=
+ (chand->info_service_config_json == nullptr)) ||
+ (service_config_json != nullptr &&
+ strcmp(service_config_json.get(),
+ chand->info_service_config_json.get()) != 0);
+ chand->info_service_config_json = std::move(service_config_json);
+ gpr_mu_unlock(&chand->info_mu);
+ // Return results.
+ *lb_policy_name = chand->info_lb_policy_name.get();
+ *lb_policy_config = resolver_result.lb_policy_config();
+ return service_config_changed;
}
static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
@@ -550,15 +158,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
if (op->on_connectivity_state_change != nullptr) {
- grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
+ chand->request_router->NotifyOnConnectivityStateChange(
+ op->connectivity_state, op->on_connectivity_state_change);
op->on_connectivity_state_change = nullptr;
op->connectivity_state = nullptr;
}
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
- if (chand->lb_policy == nullptr) {
+ if (chand->request_router->lb_policy() == nullptr) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
@@ -567,7 +174,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::LoadBalancingPolicy::PickState pick_state;
// Pick must return synchronously, because pick_state.on_complete is null.
- GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
+ GPR_ASSERT(
+ chand->request_router->lb_policy()->PickLocked(&pick_state, &error));
if (pick_state.connected_subchannel != nullptr) {
pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
@@ -586,37 +194,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
- if (chand->resolver != nullptr) {
- set_channel_connectivity_state_locked(
- chand, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
- chand->resolver.reset();
- if (!chand->started_resolving) {
- grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
- GRPC_ERROR_REF(op->disconnect_with_error));
- GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
- }
- if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- chand->lb_policy.reset();
- }
- }
- GRPC_ERROR_UNREF(op->disconnect_with_error);
+ chand->request_router->ShutdownLocked(op->disconnect_with_error);
}
if (op->reset_connect_backoff) {
- if (chand->resolver != nullptr) {
- chand->resolver->ResetBackoffLocked();
- chand->resolver->RequestReresolutionLocked();
- }
- if (chand->lb_policy != nullptr) {
- chand->lb_policy->ResetBackoffLocked();
- }
+ chand->request_router->ResetConnectionBackoffLocked();
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
-
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
@@ -667,12 +252,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
chand->owning_stack = args->channel_stack;
- GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
- on_resolver_result_changed_locked, chand,
- grpc_combiner_scheduler(chand->combiner));
+ chand->deadline_checking_enabled =
+ grpc_deadline_checking_enabled(args->channel_args);
chand->interested_parties = grpc_pollset_set_create();
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
- "client_channel");
grpc_client_channel_start_backup_polling(chand->interested_parties);
// Record max per-RPC retry buffer size.
const grpc_arg* arg = grpc_channel_args_find(
@@ -682,8 +264,6 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
// Record enable_retries.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
- chand->channelz_channel = nullptr;
- chand->previous_resolution_contained_addresses = false;
// Record client channel factory.
arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
@@ -695,9 +275,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"client channel factory arg must be a pointer");
}
- grpc_client_channel_factory_ref(
- static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
- chand->client_channel_factory =
+ grpc_client_channel_factory* client_channel_factory =
static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
// Get server name to resolve, using proxy mapper if needed.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
@@ -713,39 +291,24 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
grpc_channel_args* new_args = nullptr;
grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
&proxy_name, &new_args);
- // Instantiate resolver.
- chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
- proxy_name != nullptr ? proxy_name : arg->value.string,
- new_args != nullptr ? new_args : args->channel_args,
- chand->interested_parties, chand->combiner);
- if (proxy_name != nullptr) gpr_free(proxy_name);
- if (new_args != nullptr) grpc_channel_args_destroy(new_args);
- if (chand->resolver == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
- }
- chand->deadline_checking_enabled =
- grpc_deadline_checking_enabled(args->channel_args);
- return GRPC_ERROR_NONE;
+ // Instantiate request router.
+ grpc_client_channel_factory_ref(client_channel_factory);
+ grpc_error* error = GRPC_ERROR_NONE;
+ chand->request_router.Init(
+ chand->owning_stack, chand->combiner, client_channel_factory,
+ chand->interested_parties, &grpc_client_channel_trace,
+ process_resolver_result_locked, chand,
+ proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */,
+ new_args != nullptr ? new_args : args->channel_args, &error);
+ gpr_free(proxy_name);
+ grpc_channel_args_destroy(new_args);
+ return error;
}
/* Destructor for channel_data */
static void cc_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (chand->resolver != nullptr) {
- // The only way we can get here is if we never started resolving,
- // because we take a ref to the channel stack when we start
- // resolving and do not release it until the resolver callback is
- // invoked after the resolver shuts down.
- chand->resolver.reset();
- }
- if (chand->client_channel_factory != nullptr) {
- grpc_client_channel_factory_unref(chand->client_channel_factory);
- }
- if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
- chand->interested_parties);
- chand->lb_policy.reset();
- }
+ chand->request_router.Destroy();
// TODO(roth): Once we convert the filter API to C++, there will no
// longer be any need to explicitly reset these smart pointer data members.
chand->info_lb_policy_name.reset();
@@ -753,7 +316,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand->retry_throttle_data.reset();
chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties);
- grpc_connectivity_state_destroy(&chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
@@ -810,6 +372,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
// - add census stats for retries
namespace {
+
struct call_data;
// State used for starting a retryable batch on a subchannel call.
@@ -894,12 +457,12 @@ struct subchannel_call_retry_state {
bool completed_recv_initial_metadata : 1;
bool started_recv_trailing_metadata : 1;
bool completed_recv_trailing_metadata : 1;
+ // State for callback processing.
subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr;
grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
subchannel_batch_data* recv_message_ready_deferred_batch = nullptr;
grpc_error* recv_message_error = GRPC_ERROR_NONE;
subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr;
- // State for callback processing.
// NOTE: Do not move this next to the metadata bitfields above. That would
// save space but will also result in a data race because compiler will
// generate a 2 byte store which overwrites the meta-data fields upon
@@ -908,12 +471,12 @@ struct subchannel_call_retry_state {
};
// Pending batches stored in call data.
-typedef struct {
+struct pending_batch {
// The pending batch. If nullptr, this slot is empty.
grpc_transport_stream_op_batch* batch;
// Indicates whether payload for send ops has been cached in call data.
bool send_ops_cached;
-} pending_batch;
+};
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
@@ -950,11 +513,8 @@ struct call_data {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
GPR_ASSERT(pending_batches[i].batch == nullptr);
}
- for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
- if (pick.subchannel_call_context[i].value != nullptr) {
- pick.subchannel_call_context[i].destroy(
- pick.subchannel_call_context[i].value);
- }
+ if (have_request) {
+ request.Destroy();
}
}
@@ -981,12 +541,11 @@ struct call_data {
// Set when we get a cancel_stream op.
grpc_error* cancel_error = GRPC_ERROR_NONE;
- grpc_core::LoadBalancingPolicy::PickState pick;
+ grpc_core::ManualConstructor<grpc_core::RequestRouter::Request> request;
+ bool have_request = false;
grpc_closure pick_closure;
- grpc_closure pick_cancel_closure;
grpc_polling_entity* pollent = nullptr;
- bool pollent_added_to_interested_parties = false;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
@@ -1036,6 +595,7 @@ struct call_data {
grpc_linked_mdelem* send_trailing_metadata_storage = nullptr;
grpc_metadata_batch send_trailing_metadata;
};
+
} // namespace
// Forward declarations.
@@ -1438,8 +998,9 @@ static void do_retry(grpc_call_element* elem,
"client_channel_call_retry");
calld->subchannel_call = nullptr;
}
- if (calld->pick.connected_subchannel != nullptr) {
- calld->pick.connected_subchannel.reset();
+ if (calld->have_request) {
+ calld->have_request = false;
+ calld->request.Destroy();
}
// Compute backoff delay.
grpc_millis next_attempt_time;
@@ -1588,6 +1149,7 @@ static bool maybe_retry(grpc_call_element* elem,
//
namespace {
+
subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
call_data* calld, int refcount,
bool set_on_complete)
@@ -1628,6 +1190,7 @@ void subchannel_batch_data::destroy() {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
}
+
} // namespace
// Creates a subchannel_batch_data object on the call's arena with the
@@ -2644,17 +2207,18 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
const size_t parent_data_size =
calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
const grpc_core::ConnectedSubchannel::CallArgs call_args = {
- calld->pollent, // pollent
- calld->path, // path
- calld->call_start_time, // start_time
- calld->deadline, // deadline
- calld->arena, // arena
- calld->pick.subchannel_call_context, // context
- calld->call_combiner, // call_combiner
- parent_data_size // parent_data_size
+ calld->pollent, // pollent
+ calld->path, // path
+ calld->call_start_time, // start_time
+ calld->deadline, // deadline
+ calld->arena, // arena
+ calld->request->pick()->subchannel_call_context, // context
+ calld->call_combiner, // call_combiner
+ parent_data_size // parent_data_size
};
- grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
- call_args, &calld->subchannel_call);
+ grpc_error* new_error =
+ calld->request->pick()->connected_subchannel->CreateCall(
+ call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@@ -2666,7 +2230,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
if (parent_data_size > 0) {
new (grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call))
- subchannel_call_retry_state(calld->pick.subchannel_call_context);
+ subchannel_call_retry_state(
+ calld->request->pick()->subchannel_call_context);
}
pending_batches_resume(elem);
}
@@ -2678,7 +2243,7 @@ static void pick_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
+ if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) {
// Failed to create subchannel.
// If there was no error, this is an LB policy drop, in which case
// we return an error; otherwise, we may retry.
@@ -2707,135 +2272,27 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
-static void maybe_add_call_to_channel_interested_parties_locked(
- grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (!calld->pollent_added_to_interested_parties) {
- calld->pollent_added_to_interested_parties = true;
- grpc_polling_entity_add_to_pollset_set(calld->pollent,
- chand->interested_parties);
- }
-}
-
-static void maybe_del_call_from_channel_interested_parties_locked(
- grpc_call_element* elem) {
+// If the channel is in TRANSIENT_FAILURE and the call is not
+// wait_for_ready=true, fails the call and returns true.
+static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (calld->pollent_added_to_interested_parties) {
- calld->pollent_added_to_interested_parties = false;
- grpc_polling_entity_del_from_pollset_set(calld->pollent,
- chand->interested_parties);
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
+ if (chand->request_router->GetConnectivityState() ==
+ GRPC_CHANNEL_TRANSIENT_FAILURE &&
+ (batch->payload->send_initial_metadata.send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
+ pending_batches_fail(
+ elem,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "channel is in state TRANSIENT_FAILURE"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ true /* yield_call_combiner */);
+ return true;
}
+ return false;
}
-// Invoked when a pick is completed to leave the client_channel combiner
-// and continue processing in the call combiner.
-// If needed, removes the call's polling entity from chand->interested_parties.
-static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- maybe_del_call_from_channel_interested_parties_locked(elem);
- GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
-}
-
-namespace grpc_core {
-
-// Performs subchannel pick via LB policy.
-class LbPicker {
- public:
- // Starts a pick on chand->lb_policy.
- static void StartLocked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
- chand, calld, chand->lb_policy.get());
- }
- // If this is a retry, use the send_initial_metadata payload that
- // we've cached; otherwise, use the pending batch. The
- // send_initial_metadata batch will be the first pending batch in the
- // list, as set by get_batch_index() above.
- calld->pick.initial_metadata =
- calld->seen_send_initial_metadata
- ? &calld->send_initial_metadata
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata.send_initial_metadata;
- calld->pick.initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
- grpc_combiner_scheduler(chand->combiner));
- calld->pick.on_complete = &calld->pick_closure;
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- grpc_error* error = GRPC_ERROR_NONE;
- const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
- if (GPR_LIKELY(pick_done)) {
- // Pick completed synchronously.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
- chand, calld);
- }
- pick_done_locked(elem, error);
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
- } else {
- // Pick will be returned asynchronously.
- // Add the polling entity from call_data to the channel_data's
- // interested_parties, so that the I/O of the LB policy can be done
- // under it. It will be removed in pick_done_locked().
- maybe_add_call_to_channel_interested_parties_locked(elem);
- // Request notification on call cancellation.
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
- grpc_call_combiner_set_notify_on_cancel(
- calld->call_combiner,
- GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
- &LbPicker::CancelLocked, elem,
- grpc_combiner_scheduler(chand->combiner)));
- }
- }
-
- private:
- // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
- // Unrefs the LB policy and invokes pick_done_locked().
- static void DoneLocked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
- chand, calld);
- }
- pick_done_locked(elem, GRPC_ERROR_REF(error));
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
- }
-
- // Note: This runs under the client_channel combiner, but will NOT be
- // holding the call combiner.
- static void CancelLocked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note: chand->lb_policy may have changed since we started our pick,
- // in which case we will be cancelling the pick on a policy other than
- // the one we started it on. However, this will just be a no-op.
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
- calld, chand->lb_policy.get());
- }
- chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
- }
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
- }
-};
-
-} // namespace grpc_core
-
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
static void apply_service_config_to_call_locked(grpc_call_element* elem) {
@@ -2892,224 +2349,66 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
}
}
-// If the channel is in TRANSIENT_FAILURE and the call is not
-// wait_for_ready=true, fails the call and returns true.
-static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
- if (grpc_connectivity_state_check(&chand->state_tracker) ==
- GRPC_CHANNEL_TRANSIENT_FAILURE &&
- (batch->payload->send_initial_metadata.send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
- pending_batches_fail(
- elem,
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "channel is in state TRANSIENT_FAILURE"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
- true /* yield_call_combiner */);
- return true;
- }
- return false;
-}
-
// Invoked once resolver results are available.
-static void process_service_config_and_start_lb_pick_locked(
- grpc_call_element* elem) {
+static bool maybe_apply_service_config_to_call_locked(void* arg) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
// Only get service config data on the first attempt.
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
apply_service_config_to_call_locked(elem);
// Check this after applying service config, since it may have
// affected the call's wait_for_ready value.
- if (fail_call_if_in_transient_failure(elem)) return;
+ if (fail_call_if_in_transient_failure(elem)) return false;
}
- // Start LB pick.
- grpc_core::LbPicker::StartLocked(elem);
+ return true;
}
-namespace grpc_core {
-
-// Handles waiting for a resolver result.
-// Used only for the first call on an idle channel.
-class ResolverResultWaiter {
- public:
- explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: deferring pick pending resolver result",
- chand, calld);
- }
- // Add closure to be run when a resolver result is available.
- GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
- grpc_combiner_scheduler(chand->combiner));
- AddToWaitingList();
- // Set cancellation closure, so that we abort if the call is cancelled.
- GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
- this, grpc_combiner_scheduler(chand->combiner));
- grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
- &cancel_closure_);
- }
-
- private:
- // Adds closure_ to chand->waiting_for_resolver_result_closures.
- void AddToWaitingList() {
- channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &done_closure_, GRPC_ERROR_NONE);
- }
-
- // Invoked when a resolver result is available.
- static void DoneLocked(void* arg, grpc_error* error) {
- ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
- // If CancelLocked() has already run, delete ourselves without doing
- // anything. Note that the call stack may have already been destroyed,
- // so it's not safe to access anything in elem_.
- if (GPR_UNLIKELY(self->finished_)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "call cancelled before resolver result");
- }
- Delete(self);
- return;
- }
- // Otherwise, process the resolver result.
- grpc_call_element* elem = self->elem_;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
- chand, calld);
- }
- pick_done_locked(elem, GRPC_ERROR_REF(error));
- } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- // Shutting down.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
- calld);
- }
- pick_done_locked(elem,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
- // Transient resolver failure.
- // If call has wait_for_ready=true, try again; otherwise, fail.
- uint32_t send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=true; trying again",
- chand, calld);
- }
- // Re-add ourselves to the waiting list.
- self->AddToWaitingList();
- // Return early so that we don't set finished_ to true below.
- return;
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=false; failing",
- chand, calld);
- }
- pick_done_locked(
- elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
- }
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
- chand, calld);
- }
- process_service_config_and_start_lb_pick_locked(elem);
- }
- self->finished_ = true;
- }
-
- // Invoked when the call is cancelled.
- // Note: This runs under the client_channel combiner, but will NOT be
- // holding the call combiner.
- static void CancelLocked(void* arg, grpc_error* error) {
- ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
- // If DoneLocked() has already run, delete ourselves without doing anything.
- if (GPR_LIKELY(self->finished_)) {
- Delete(self);
- return;
- }
- // If we are being cancelled, immediately invoke pick_done_locked()
- // to propagate the error back to the caller.
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
- grpc_call_element* elem = self->elem_;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: cancelling call waiting for name "
- "resolution",
- chand, calld);
- }
- // Note: Although we are not in the call combiner here, we are
- // basically stealing the call combiner from the pending pick, so
- // it's safe to call pick_done_locked() here -- we are essentially
- // calling it here instead of calling it in DoneLocked().
- pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
- }
- self->finished_ = true;
- }
-
- grpc_call_element* elem_;
- grpc_closure done_closure_;
- grpc_closure cancel_closure_;
- bool finished_ = false;
-};
-
-} // namespace grpc_core
-
static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
+ GPR_ASSERT(!calld->have_request);
GPR_ASSERT(calld->subchannel_call == nullptr);
- if (GPR_LIKELY(chand->lb_policy != nullptr)) {
- // We already have resolver results, so process the service config
- // and start an LB pick.
- process_service_config_and_start_lb_pick_locked(elem);
- } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- pick_done_locked(elem,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- } else {
- // We do not yet have an LB policy, so wait for a resolver result.
- if (GPR_UNLIKELY(!chand->started_resolving)) {
- start_resolving_locked(chand);
- } else {
- // Normally, we want to do this check in
- // process_service_config_and_start_lb_pick_locked(), so that we
- // can honor the wait_for_ready setting in the service config.
- // However, if the channel is in TRANSIENT_FAILURE at this point, that
- // means that the resolver has returned a failure, so we're not going
- // to get a service config right away. In that case, we fail the
- // call now based on the wait_for_ready value passed in from the
- // application.
- if (fail_call_if_in_transient_failure(elem)) return;
- }
- // Create a new waiter, which will delete itself when done.
- grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
- // Add the polling entity from call_data to the channel_data's
- // interested_parties, so that the I/O of the resolver can be done
- // under it. It will be removed in pick_done_locked().
- maybe_add_call_to_channel_interested_parties_locked(elem);
+ // Normally, we want to do this check until after we've processed the
+ // service config, so that we can honor the wait_for_ready setting in
+ // the service config. However, if the channel is in TRANSIENT_FAILURE
+ // and we don't have an LB policy at this point, that means that the
+ // resolver has returned a failure, so we're not going to get a service
+ // config right away. In that case, we fail the call now based on the
+ // wait_for_ready value passed in from the application.
+ if (chand->request_router->lb_policy() == nullptr &&
+ fail_call_if_in_transient_failure(elem)) {
+ return;
}
+ // If this is a retry, use the send_initial_metadata payload that
+ // we've cached; otherwise, use the pending batch. The
+ // send_initial_metadata batch will be the first pending batch in the
+ // list, as set by get_batch_index() above.
+ // TODO(roth): What if the LB policy needs to add something to the
+ // call's initial metadata, and then there's a retry? We don't want
+ // the new metadata to be added twice. We might need to somehow
+ // allocate the subchannel batch earlier so that we can give the
+ // subchannel's copy of the metadata batch (which is copied for each
+ // attempt) to the LB policy instead the one from the parent channel.
+ grpc_metadata_batch* initial_metadata =
+ calld->seen_send_initial_metadata
+ ? &calld->send_initial_metadata
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata.send_initial_metadata;
+ uint32_t* initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? &calld->send_initial_metadata_flags
+ : &calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
+ grpc_schedule_on_exec_ctx);
+ calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent,
+ initial_metadata, initial_metadata_flags,
+ maybe_apply_service_config_to_call_locked, elem,
+ &calld->pick_closure);
+ calld->have_request = true;
+ chand->request_router->RouteCallLocked(calld->request.get());
}
//
@@ -3249,23 +2548,10 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
- channel_data* chand = static_cast<channel_data*>(arg);
- if (chand->lb_policy != nullptr) {
- chand->lb_policy->ExitIdleLocked();
- } else {
- chand->exit_idle_when_lb_policy_arrives = true;
- if (!chand->started_resolving && chand->resolver != nullptr) {
- start_resolving_locked(chand);
- }
- }
- GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
-}
-
void grpc_client_channel_set_channelz_node(
grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- chand->channelz_channel = node;
+ chand->request_router->set_channelz_node(node);
}
void grpc_client_channel_populate_child_refs(
@@ -3273,17 +2559,22 @@ void grpc_client_channel_populate_child_refs(
grpc_core::channelz::ChildRefsList* child_subchannels,
grpc_core::channelz::ChildRefsList* child_channels) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- if (chand->lb_policy != nullptr) {
- chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
- child_channels);
+ if (chand->request_router->lb_policy() != nullptr) {
+ chand->request_router->lb_policy()->FillChildRefsForChannelz(
+ child_subchannels, child_channels);
}
}
+static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
+ channel_data* chand = static_cast<channel_data*>(arg);
+ chand->request_router->ExitIdleLocked();
+ GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
+}
+
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- grpc_connectivity_state out =
- grpc_connectivity_state_check(&chand->state_tracker);
+ grpc_connectivity_state out = chand->request_router->GetConnectivityState();
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
GRPC_CLOSURE_SCHED(
@@ -3328,19 +2619,19 @@ static void external_connectivity_watcher_list_append(
}
static void external_connectivity_watcher_list_remove(
- channel_data* chand, external_connectivity_watcher* too_remove) {
+ channel_data* chand, external_connectivity_watcher* to_remove) {
GPR_ASSERT(
- lookup_external_connectivity_watcher(chand, too_remove->on_complete));
+ lookup_external_connectivity_watcher(chand, to_remove->on_complete));
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- if (too_remove == chand->external_connectivity_watcher_list_head) {
- chand->external_connectivity_watcher_list_head = too_remove->next;
+ if (to_remove == chand->external_connectivity_watcher_list_head) {
+ chand->external_connectivity_watcher_list_head = to_remove->next;
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return;
}
external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != nullptr) {
- if (w->next == too_remove) {
+ if (w->next == to_remove) {
w->next = w->next->next;
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return;
@@ -3392,15 +2683,15 @@ static void watch_connectivity_state_locked(void* arg,
GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
grpc_combiner_scheduler(w->chand->combiner));
- grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
- w->state, &w->my_closure);
+ w->chand->request_router->NotifyOnConnectivityStateChange(w->state,
+ &w->my_closure);
} else {
GPR_ASSERT(w->watcher_timer_init == nullptr);
found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
if (found) {
GPR_ASSERT(found->on_complete == w->on_complete);
- grpc_connectivity_state_notify_on_state_change(
- &found->chand->state_tracker, nullptr, &found->my_closure);
+ found->chand->request_router->NotifyOnConnectivityStateChange(
+ nullptr, &found->my_closure);
}
grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 6b76fe5d5d..293d8e960c 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -65,10 +65,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
struct PickState {
/// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata = nullptr;
- /// Bitmask used for selective cancelling. See
+ /// Pointer to bitmask used for selective cancelling. See
/// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in
/// grpc_types.h.
- uint32_t initial_metadata_flags = 0;
+ uint32_t* initial_metadata_flags = nullptr;
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
@@ -88,6 +88,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
LoadBalancingPolicy(const LoadBalancingPolicy&) = delete;
LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete;
+ /// Returns the name of the LB policy.
+ virtual const char* name() const GRPC_ABSTRACT;
+
/// Updates the policy with a new set of \a args and a new \a lb_config from
/// the resolver. Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
@@ -205,12 +208,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_pollset_set* interested_parties_;
/// Callback to force a re-resolution.
grpc_closure* request_reresolution_;
-
- // Dummy classes needed for alignment issues.
- // See https://github.com/grpc/grpc/issues/16032 for context.
- // TODO(ncteisen): remove this as soon as the issue is resolved.
- channelz::ChildRefsList dummy_list_foo;
- channelz::ChildRefsList dummy_list_bar;
};
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a9a5965ed1..ba40febd53 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -122,10 +122,14 @@ TraceFlag grpc_lb_glb_trace(false, "glb");
namespace {
+constexpr char kGrpclb[] = "grpclb";
+
class GrpcLb : public LoadBalancingPolicy {
public:
explicit GrpcLb(const Args& args);
+ const char* name() const override { return kGrpclb; }
+
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
@@ -361,7 +365,9 @@ void lb_token_destroy(void* token) {
}
}
int lb_token_cmp(void* token1, void* token2) {
- return GPR_ICMP(token1, token2);
+ // Always indicate a match, since we don't want this channel arg to
+ // affect the subchannel's key in the index.
+ return 0;
}
const grpc_arg_pointer_vtable lb_token_arg_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
@@ -422,7 +428,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
- void* lb_token;
+ grpc_mdelem lb_token;
if (server->has_load_balance_token) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
@@ -430,9 +436,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
strnlen(server->load_balance_token, lb_token_max_length);
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
- lb_token =
- (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
- .payload;
+ lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -440,14 +444,16 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
"be used instead",
uri);
gpr_free(uri);
- lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
// Add address.
grpc_arg arg = grpc_channel_arg_pointer_create(
- const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
- &lb_token_arg_vtable);
+ const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
+ (void*)lb_token.payload, &lb_token_arg_vtable);
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
addresses.emplace_back(addr, args);
+ // Clean up.
+ GRPC_MDELEM_UNREF(lb_token);
}
return addresses;
}
@@ -525,8 +531,7 @@ void GrpcLb::BalancerCallState::Orphan() {
void GrpcLb::BalancerCallState::StartQuery() {
GPR_ASSERT(lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
+ gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
grpclb_policy_.get(), this, lb_call_);
}
// Create the ops.
@@ -670,8 +675,9 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
- call_error);
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
+ grpclb_policy_.get(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
@@ -732,15 +738,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
&initial_response->client_stats_report_interval));
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; "
- "client load reporting interval = %" PRId64 " milliseconds",
- grpclb_policy, lb_calld->client_stats_report_interval_);
+ "[grpclb %p] lb_calld=%p: Received initial LB response "
+ "message; client load reporting interval = %" PRId64
+ " milliseconds",
+ grpclb_policy, lb_calld,
+ lb_calld->client_stats_report_interval_);
}
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; client load "
- "reporting NOT enabled",
- grpclb_policy);
+ "[grpclb %p] lb_calld=%p: Received initial LB response message; "
+ "client load reporting NOT enabled",
+ grpclb_policy, lb_calld);
}
grpc_grpclb_initial_response_destroy(initial_response);
lb_calld->seen_initial_response_ = true;
@@ -750,15 +758,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
GPR_ASSERT(lb_calld->lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
- grpclb_policy, serverlist->num_servers);
+ "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
+ " servers received",
+ grpclb_policy, lb_calld, serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
ParseServer(serverlist->servers[i], &addr);
char* ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
- grpclb_policy, i, ipport);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s",
+ grpclb_policy, lb_calld, i, ipport);
gpr_free(ipport);
}
}
@@ -778,9 +788,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- grpclb_policy);
+ "[grpclb %p] lb_calld=%p: Incoming server list identical to "
+ "current, ignoring.",
+ grpclb_policy, lb_calld);
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { // New serverlist.
@@ -806,8 +816,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
char* response_slice_str =
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
gpr_log(GPR_ERROR,
- "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
- grpclb_policy, response_slice_str);
+ "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
+ "Ignoring.",
+ grpclb_policy, lb_calld, response_slice_str);
gpr_free(response_slice_str);
}
grpc_slice_unref_internal(response_slice);
@@ -838,9 +849,9 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
char* status_details =
grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
gpr_log(GPR_INFO,
- "[grpclb %p] Status from LB server received. Status = %d, details "
- "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
- grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
+ "[grpclb %p] lb_calld=%p: Status from LB server received. "
+ "Status = %d, details = '%s', (lb_call: %p), error '%s'",
+ grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
lb_calld->lb_call_, grpc_error_string(error));
gpr_free(status_details);
}
@@ -1129,7 +1140,7 @@ void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pp != nullptr) {
PendingPick* next = pp->next;
- if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete,
@@ -1592,6 +1603,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
this);
return;
}
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
+ rr_policy_.get());
+ }
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
@@ -1685,10 +1700,6 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.args = args;
CreateRoundRobinPolicyLocked(lb_policy_args);
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
- rr_policy_.get());
- }
}
grpc_channel_args_destroy(args);
}
@@ -1812,7 +1823,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
}
- const char* name() const override { return "grpclb"; }
+ const char* name() const override { return kGrpclb; }
};
} // namespace
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 74c17612a2..d6ff74ec7f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -43,10 +43,14 @@ namespace {
// pick_first LB policy
//
+constexpr char kPickFirst[] = "pick_first";
+
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
+ const char* name() const override { return kPickFirst; }
+
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
@@ -234,7 +238,7 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pick != nullptr) {
PickState* next = pick->next;
- if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -622,7 +626,7 @@ class PickFirstFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
}
- const char* name() const override { return "pick_first"; }
+ const char* name() const override { return kPickFirst; }
};
} // namespace
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 63089afbd7..3bcb33ef11 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -53,10 +53,14 @@ namespace {
// round_robin LB policy
//
+constexpr char kRoundRobin[] = "round_robin";
+
class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
+ const char* name() const override { return kRoundRobin; }
+
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
@@ -291,7 +295,7 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pick != nullptr) {
PickState* next = pick->next;
- if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
@@ -700,7 +704,7 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
}
- const char* name() const override { return "round_robin"; }
+ const char* name() const override { return kRoundRobin; }
};
} // namespace
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 3c25de2386..8787f5bcc2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -115,10 +115,14 @@ TraceFlag grpc_lb_xds_trace(false, "xds");
namespace {
+constexpr char kXds[] = "xds_experimental";
+
class XdsLb : public LoadBalancingPolicy {
public:
explicit XdsLb(const Args& args);
+ const char* name() const override { return kXds; }
+
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
@@ -1053,7 +1057,7 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
pending_picks_ = nullptr;
while (pp != nullptr) {
PendingPick* next = pp->next;
- if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
// Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete,
@@ -1651,7 +1655,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args));
}
- const char* name() const override { return "xds_experimental"; }
+ const char* name() const override { return kXds; }
};
} // namespace
diff --git a/src/core/ext/filters/client_channel/request_routing.cc b/src/core/ext/filters/client_channel/request_routing.cc
new file mode 100644
index 0000000000..f9a7e164e7
--- /dev/null
+++ b/src/core/ext/filters/client_channel/request_routing.cc
@@ -0,0 +1,936 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/request_routing.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/filters/client_channel/retry_throttle.h"
+#include "src/core/ext/filters/client_channel/server_address.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/service_config.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_metadata.h"
+
+namespace grpc_core {
+
+//
+// RequestRouter::Request::ResolverResultWaiter
+//
+
+// Handles waiting for a resolver result.
+// Used only for the first call on an idle channel.
+class RequestRouter::Request::ResolverResultWaiter {
+ public:
+ explicit ResolverResultWaiter(Request* request)
+ : request_router_(request->request_router_),
+ request_(request),
+ tracer_enabled_(request_router_->tracer_->enabled()) {
+ if (tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: deferring pick pending resolver "
+ "result",
+ request_router_, request);
+ }
+ // Add closure to be run when a resolver result is available.
+ GRPC_CLOSURE_INIT(&done_closure_, &DoneLocked, this,
+ grpc_combiner_scheduler(request_router_->combiner_));
+ AddToWaitingList();
+ // Set cancellation closure, so that we abort if the call is cancelled.
+ GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this,
+ grpc_combiner_scheduler(request_router_->combiner_));
+ grpc_call_combiner_set_notify_on_cancel(request->call_combiner_,
+ &cancel_closure_);
+ }
+
+ private:
+ // Adds done_closure_ to
+ // request_router_->waiting_for_resolver_result_closures_.
+ void AddToWaitingList() {
+ grpc_closure_list_append(
+ &request_router_->waiting_for_resolver_result_closures_, &done_closure_,
+ GRPC_ERROR_NONE);
+ }
+
+ // Invoked when a resolver result is available.
+ static void DoneLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ RequestRouter* request_router = self->request_router_;
+ // If CancelLocked() has already run, delete ourselves without doing
+ // anything. Note that the call stack may have already been destroyed,
+ // so it's not safe to access anything in state_.
+ if (GPR_UNLIKELY(self->finished_)) {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p: call cancelled before resolver result",
+ request_router);
+ }
+ Delete(self);
+ return;
+ }
+ // Otherwise, process the resolver result.
+ Request* request = self->request_;
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: resolver failed to return data",
+ request_router, request);
+ }
+ GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_REF(error));
+ } else if (GPR_UNLIKELY(request_router->resolver_ == nullptr)) {
+ // Shutting down.
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO, "request_router=%p request=%p: resolver disconnected",
+ request_router, request);
+ }
+ GRPC_CLOSURE_RUN(request->on_route_done_,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ } else if (GPR_UNLIKELY(request_router->lb_policy_ == nullptr)) {
+ // Transient resolver failure.
+ // If call has wait_for_ready=true, try again; otherwise, fail.
+ if (*request->pick_.initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: resolver returned but no LB "
+ "policy; wait_for_ready=true; trying again",
+ request_router, request);
+ }
+ // Re-add ourselves to the waiting list.
+ self->AddToWaitingList();
+ // Return early so that we don't set finished_ to true below.
+ return;
+ } else {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: resolver returned but no LB "
+ "policy; wait_for_ready=false; failing",
+ request_router, request);
+ }
+ GRPC_CLOSURE_RUN(
+ request->on_route_done_,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
+ } else {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: resolver returned, doing LB "
+ "pick",
+ request_router, request);
+ }
+ request->ProcessServiceConfigAndStartLbPickLocked();
+ }
+ self->finished_ = true;
+ }
+
+ // Invoked when the call is cancelled.
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ RequestRouter* request_router = self->request_router_;
+ // If DoneLocked() has already run, delete ourselves without doing anything.
+ if (self->finished_) {
+ Delete(self);
+ return;
+ }
+ Request* request = self->request_;
+ // If we are being cancelled, immediately invoke on_route_done_
+ // to propagate the error back to the caller.
+ if (error != GRPC_ERROR_NONE) {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: cancelling call waiting for "
+ "name resolution",
+ request_router, request);
+ }
+ // Note: Although we are not in the call combiner here, we are
+ // basically stealing the call combiner from the pending pick, so
+ // it's safe to run on_route_done_ here -- we are essentially
+ // calling it here instead of calling it in DoneLocked().
+ GRPC_CLOSURE_RUN(request->on_route_done_,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
+ }
+ self->finished_ = true;
+ }
+
+ RequestRouter* request_router_;
+ Request* request_;
+ const bool tracer_enabled_;
+ grpc_closure done_closure_;
+ grpc_closure cancel_closure_;
+ bool finished_ = false;
+};
+
+//
+// RequestRouter::Request::AsyncPickCanceller
+//
+
+// Handles the call combiner cancellation callback for an async LB pick.
+class RequestRouter::Request::AsyncPickCanceller {
+ public:
+ explicit AsyncPickCanceller(Request* request)
+ : request_router_(request->request_router_),
+ request_(request),
+ tracer_enabled_(request_router_->tracer_->enabled()) {
+ GRPC_CALL_STACK_REF(request->owning_call_, "pick_callback_cancel");
+ // Set cancellation closure, so that we abort if the call is cancelled.
+ GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this,
+ grpc_combiner_scheduler(request_router_->combiner_));
+ grpc_call_combiner_set_notify_on_cancel(request->call_combiner_,
+ &cancel_closure_);
+ }
+
+ void MarkFinishedLocked() {
+ finished_ = true;
+ GRPC_CALL_STACK_UNREF(request_->owning_call_, "pick_callback_cancel");
+ }
+
+ private:
+ // Invoked when the call is cancelled.
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ AsyncPickCanceller* self = static_cast<AsyncPickCanceller*>(arg);
+ Request* request = self->request_;
+ RequestRouter* request_router = self->request_router_;
+ if (!self->finished_) {
+ // Note: request_router->lb_policy_ may have changed since we started our
+ // pick, in which case we will be cancelling the pick on a policy other
+ // than the one we started it on. However, this will just be a no-op.
+ if (error != GRPC_ERROR_NONE && request_router->lb_policy_ != nullptr) {
+ if (self->tracer_enabled_) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: cancelling pick from LB "
+ "policy %p",
+ request_router, request, request_router->lb_policy_.get());
+ }
+ request_router->lb_policy_->CancelPickLocked(&request->pick_,
+ GRPC_ERROR_REF(error));
+ }
+ request->pick_canceller_ = nullptr;
+ GRPC_CALL_STACK_UNREF(request->owning_call_, "pick_callback_cancel");
+ }
+ Delete(self);
+ }
+
+ RequestRouter* request_router_;
+ Request* request_;
+ const bool tracer_enabled_;
+ grpc_closure cancel_closure_;
+ bool finished_ = false;
+};
+
+//
+// RequestRouter::Request
+//
+
+RequestRouter::Request::Request(grpc_call_stack* owning_call,
+ grpc_call_combiner* call_combiner,
+ grpc_polling_entity* pollent,
+ grpc_metadata_batch* send_initial_metadata,
+ uint32_t* send_initial_metadata_flags,
+ ApplyServiceConfigCallback apply_service_config,
+ void* apply_service_config_user_data,
+ grpc_closure* on_route_done)
+ : owning_call_(owning_call),
+ call_combiner_(call_combiner),
+ pollent_(pollent),
+ apply_service_config_(apply_service_config),
+ apply_service_config_user_data_(apply_service_config_user_data),
+ on_route_done_(on_route_done) {
+ pick_.initial_metadata = send_initial_metadata;
+ pick_.initial_metadata_flags = send_initial_metadata_flags;
+}
+
+RequestRouter::Request::~Request() {
+ if (pick_.connected_subchannel != nullptr) {
+ pick_.connected_subchannel.reset();
+ }
+ for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
+ if (pick_.subchannel_call_context[i].destroy != nullptr) {
+ pick_.subchannel_call_context[i].destroy(
+ pick_.subchannel_call_context[i].value);
+ }
+ }
+}
+
+// Invoked once resolver results are available.
+void RequestRouter::Request::ProcessServiceConfigAndStartLbPickLocked() {
+ // Get service config data if needed.
+ if (!apply_service_config_(apply_service_config_user_data_)) return;
+ // Start LB pick.
+ StartLbPickLocked();
+}
+
+void RequestRouter::Request::MaybeAddCallToInterestedPartiesLocked() {
+ if (!pollent_added_to_interested_parties_) {
+ pollent_added_to_interested_parties_ = true;
+ grpc_polling_entity_add_to_pollset_set(
+ pollent_, request_router_->interested_parties_);
+ }
+}
+
+void RequestRouter::Request::MaybeRemoveCallFromInterestedPartiesLocked() {
+ if (pollent_added_to_interested_parties_) {
+ pollent_added_to_interested_parties_ = false;
+ grpc_polling_entity_del_from_pollset_set(
+ pollent_, request_router_->interested_parties_);
+ }
+}
+
+// Starts a pick on the LB policy.
+void RequestRouter::Request::StartLbPickLocked() {
+ if (request_router_->tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: starting pick on lb_policy=%p",
+ request_router_, this, request_router_->lb_policy_.get());
+ }
+ GRPC_CLOSURE_INIT(&on_pick_done_, &LbPickDoneLocked, this,
+ grpc_combiner_scheduler(request_router_->combiner_));
+ pick_.on_complete = &on_pick_done_;
+ GRPC_CALL_STACK_REF(owning_call_, "pick_callback");
+ grpc_error* error = GRPC_ERROR_NONE;
+ const bool pick_done =
+ request_router_->lb_policy_->PickLocked(&pick_, &error);
+ if (pick_done) {
+ // Pick completed synchronously.
+ if (request_router_->tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: pick completed synchronously",
+ request_router_, this);
+ }
+ GRPC_CLOSURE_RUN(on_route_done_, error);
+ GRPC_CALL_STACK_UNREF(owning_call_, "pick_callback");
+ } else {
+ // Pick will be returned asynchronously.
+ // Add the request's polling entity to the request_router's
+ // interested_parties, so that the I/O of the LB policy can be done
+ // under it. It will be removed in LbPickDoneLocked().
+ MaybeAddCallToInterestedPartiesLocked();
+ // Request notification on call cancellation.
+ // We allocate a separate object to track cancellation, since the
+ // cancellation closure might still be pending when we need to reuse
+ // the memory in which this Request object is stored for a subsequent
+ // retry attempt.
+ pick_canceller_ = New<AsyncPickCanceller>(this);
+ }
+}
+
+// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
+// Unrefs the LB policy and invokes on_route_done_.
+void RequestRouter::Request::LbPickDoneLocked(void* arg, grpc_error* error) {
+ Request* self = static_cast<Request*>(arg);
+ RequestRouter* request_router = self->request_router_;
+ if (request_router->tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "request_router=%p request=%p: pick completed asynchronously",
+ request_router, self);
+ }
+ self->MaybeRemoveCallFromInterestedPartiesLocked();
+ if (self->pick_canceller_ != nullptr) {
+ self->pick_canceller_->MarkFinishedLocked();
+ }
+ GRPC_CLOSURE_RUN(self->on_route_done_, GRPC_ERROR_REF(error));
+ GRPC_CALL_STACK_UNREF(self->owning_call_, "pick_callback");
+}
+
+//
+// RequestRouter::LbConnectivityWatcher
+//
+
+class RequestRouter::LbConnectivityWatcher {
+ public:
+ LbConnectivityWatcher(RequestRouter* request_router,
+ grpc_connectivity_state state,
+ LoadBalancingPolicy* lb_policy,
+ grpc_channel_stack* owning_stack,
+ grpc_combiner* combiner)
+ : request_router_(request_router),
+ state_(state),
+ lb_policy_(lb_policy),
+ owning_stack_(owning_stack) {
+ GRPC_CHANNEL_STACK_REF(owning_stack_, "LbConnectivityWatcher");
+ GRPC_CLOSURE_INIT(&on_changed_, &OnLbPolicyStateChangedLocked, this,
+ grpc_combiner_scheduler(combiner));
+ lb_policy_->NotifyOnStateChangeLocked(&state_, &on_changed_);
+ }
+
+ ~LbConnectivityWatcher() {
+ GRPC_CHANNEL_STACK_UNREF(owning_stack_, "LbConnectivityWatcher");
+ }
+
+ private:
+ static void OnLbPolicyStateChangedLocked(void* arg, grpc_error* error) {
+ LbConnectivityWatcher* self = static_cast<LbConnectivityWatcher*>(arg);
+ // If the notification is not for the current policy, we're stale,
+ // so delete ourselves.
+ if (self->lb_policy_ != self->request_router_->lb_policy_.get()) {
+ Delete(self);
+ return;
+ }
+ // Otherwise, process notification.
+ if (self->request_router_->tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: lb_policy=%p state changed to %s",
+ self->request_router_, self->lb_policy_,
+ grpc_connectivity_state_name(self->state_));
+ }
+ self->request_router_->SetConnectivityStateLocked(
+ self->state_, GRPC_ERROR_REF(error), "lb_changed");
+ // If shutting down, terminate watch.
+ if (self->state_ == GRPC_CHANNEL_SHUTDOWN) {
+ Delete(self);
+ return;
+ }
+ // Renew watch.
+ self->lb_policy_->NotifyOnStateChangeLocked(&self->state_,
+ &self->on_changed_);
+ }
+
+ RequestRouter* request_router_;
+ grpc_connectivity_state state_;
+ // LB policy address. No ref held, so not safe to dereference unless
+ // it happens to match request_router->lb_policy_.
+ LoadBalancingPolicy* lb_policy_;
+ grpc_channel_stack* owning_stack_;
+ grpc_closure on_changed_;
+};
+
+//
+// RequestRounter::ReresolutionRequestHandler
+//
+
+class RequestRouter::ReresolutionRequestHandler {
+ public:
+ ReresolutionRequestHandler(RequestRouter* request_router,
+ LoadBalancingPolicy* lb_policy,
+ grpc_channel_stack* owning_stack,
+ grpc_combiner* combiner)
+ : request_router_(request_router),
+ lb_policy_(lb_policy),
+ owning_stack_(owning_stack) {
+ GRPC_CHANNEL_STACK_REF(owning_stack_, "ReresolutionRequestHandler");
+ GRPC_CLOSURE_INIT(&closure_, &OnRequestReresolutionLocked, this,
+ grpc_combiner_scheduler(combiner));
+ lb_policy_->SetReresolutionClosureLocked(&closure_);
+ }
+
+ private:
+ static void OnRequestReresolutionLocked(void* arg, grpc_error* error) {
+ ReresolutionRequestHandler* self =
+ static_cast<ReresolutionRequestHandler*>(arg);
+ RequestRouter* request_router = self->request_router_;
+ // If this invocation is for a stale LB policy, treat it as an LB shutdown
+ // signal.
+ if (self->lb_policy_ != request_router->lb_policy_.get() ||
+ error != GRPC_ERROR_NONE || request_router->resolver_ == nullptr) {
+ GRPC_CHANNEL_STACK_UNREF(request_router->owning_stack_,
+ "ReresolutionRequestHandler");
+ Delete(self);
+ return;
+ }
+ if (request_router->tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: started name re-resolving",
+ request_router);
+ }
+ request_router->resolver_->RequestReresolutionLocked();
+ // Give back the closure to the LB policy.
+ self->lb_policy_->SetReresolutionClosureLocked(&self->closure_);
+ }
+
+ RequestRouter* request_router_;
+ // LB policy address. No ref held, so not safe to dereference unless
+ // it happens to match request_router->lb_policy_.
+ LoadBalancingPolicy* lb_policy_;
+ grpc_channel_stack* owning_stack_;
+ grpc_closure closure_;
+};
+
+//
+// RequestRouter
+//
+
+RequestRouter::RequestRouter(
+ grpc_channel_stack* owning_stack, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ grpc_pollset_set* interested_parties, TraceFlag* tracer,
+ ProcessResolverResultCallback process_resolver_result,
+ void* process_resolver_result_user_data, const char* target_uri,
+ const grpc_channel_args* args, grpc_error** error)
+ : owning_stack_(owning_stack),
+ combiner_(combiner),
+ client_channel_factory_(client_channel_factory),
+ interested_parties_(interested_parties),
+ tracer_(tracer),
+ process_resolver_result_(process_resolver_result),
+ process_resolver_result_user_data_(process_resolver_result_user_data) {
+ GRPC_CLOSURE_INIT(&on_resolver_result_changed_,
+ &RequestRouter::OnResolverResultChangedLocked, this,
+ grpc_combiner_scheduler(combiner));
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+ "request_router");
+ grpc_channel_args* new_args = nullptr;
+ if (process_resolver_result == nullptr) {
+ grpc_arg arg = grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0);
+ new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ }
+ resolver_ = ResolverRegistry::CreateResolver(
+ target_uri, (new_args == nullptr ? args : new_args), interested_parties_,
+ combiner_);
+ grpc_channel_args_destroy(new_args);
+ if (resolver_ == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
+ }
+}
+
+RequestRouter::~RequestRouter() {
+ if (resolver_ != nullptr) {
+ // The only way we can get here is if we never started resolving,
+ // because we take a ref to the channel stack when we start
+ // resolving and do not release it until the resolver callback is
+ // invoked after the resolver shuts down.
+ resolver_.reset();
+ }
+ if (lb_policy_ != nullptr) {
+ grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
+ interested_parties_);
+ lb_policy_.reset();
+ }
+ if (client_channel_factory_ != nullptr) {
+ grpc_client_channel_factory_unref(client_channel_factory_);
+ }
+ grpc_connectivity_state_destroy(&state_tracker_);
+}
+
+namespace {
+
+const char* GetChannelConnectivityStateChangeString(
+ grpc_connectivity_state state) {
+ switch (state) {
+ case GRPC_CHANNEL_IDLE:
+ return "Channel state change to IDLE";
+ case GRPC_CHANNEL_CONNECTING:
+ return "Channel state change to CONNECTING";
+ case GRPC_CHANNEL_READY:
+ return "Channel state change to READY";
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ return "Channel state change to TRANSIENT_FAILURE";
+ case GRPC_CHANNEL_SHUTDOWN:
+ return "Channel state change to SHUTDOWN";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+} // namespace
+
+void RequestRouter::SetConnectivityStateLocked(grpc_connectivity_state state,
+ grpc_error* error,
+ const char* reason) {
+ if (lb_policy_ != nullptr) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // Cancel picks with wait_for_ready=false.
+ lb_policy_->CancelMatchingPicksLocked(
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ } else if (state == GRPC_CHANNEL_SHUTDOWN) {
+ // Cancel all picks.
+ lb_policy_->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
+ }
+ }
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: setting connectivity state to %s",
+ this, grpc_connectivity_state_name(state));
+ }
+ if (channelz_node_ != nullptr) {
+ channelz_node_->AddTraceEvent(
+ channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string(
+ GetChannelConnectivityStateChangeString(state)));
+ }
+ grpc_connectivity_state_set(&state_tracker_, state, error, reason);
+}
+
+void RequestRouter::StartResolvingLocked() {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: starting name resolution", this);
+ }
+ GPR_ASSERT(!started_resolving_);
+ started_resolving_ = true;
+ GRPC_CHANNEL_STACK_REF(owning_stack_, "resolver");
+ resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_);
+}
+
+// Invoked from the resolver NextLocked() callback when the resolver
+// is shutting down.
+void RequestRouter::OnResolverShutdownLocked(grpc_error* error) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: shutting down", this);
+ }
+ if (lb_policy_ != nullptr) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this,
+ lb_policy_.get());
+ }
+ grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
+ interested_parties_);
+ lb_policy_.reset();
+ }
+ if (resolver_ != nullptr) {
+ // This should never happen; it can only be triggered by a resolver
+ // implementation spotaneously deciding to report shutdown without
+ // being orphaned. This code is included just to be defensive.
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "request_router=%p: spontaneous shutdown from resolver %p", this,
+ resolver_.get());
+ }
+ resolver_.reset();
+ SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Resolver spontaneous shutdown", &error, 1),
+ "resolver_spontaneous_shutdown");
+ }
+ grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Channel disconnected", &error, 1));
+ GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_);
+ GRPC_CHANNEL_STACK_UNREF(owning_stack_, "resolver");
+ grpc_channel_args_destroy(resolver_result_);
+ resolver_result_ = nullptr;
+ GRPC_ERROR_UNREF(error);
+}
+
+// Creates a new LB policy, replacing any previous one.
+// If the new policy is created successfully, sets *connectivity_state and
+// *connectivity_error to its initial connectivity state; otherwise,
+// leaves them unchanged.
+void RequestRouter::CreateNewLbPolicyLocked(
+ const char* lb_policy_name, grpc_json* lb_config,
+ grpc_connectivity_state* connectivity_state,
+ grpc_error** connectivity_error, TraceStringVector* trace_strings) {
+ LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.combiner = combiner_;
+ lb_policy_args.client_channel_factory = client_channel_factory_;
+ lb_policy_args.args = resolver_result_;
+ lb_policy_args.lb_config = lb_config;
+ OrphanablePtr<LoadBalancingPolicy> new_lb_policy =
+ LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(lb_policy_name,
+ lb_policy_args);
+ if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
+ gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+ if (channelz_node_ != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
+ } else {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: created new LB policy \"%s\" (%p)",
+ this, lb_policy_name, new_lb_policy.get());
+ }
+ if (channelz_node_ != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
+ // Swap out the LB policy and update the fds in interested_parties_.
+ if (lb_policy_ != nullptr) {
+ if (tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this,
+ lb_policy_.get());
+ }
+ grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
+ interested_parties_);
+ lb_policy_->HandOffPendingPicksLocked(new_lb_policy.get());
+ }
+ lb_policy_ = std::move(new_lb_policy);
+ grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(),
+ interested_parties_);
+ // Create re-resolution request handler for the new LB policy. It
+ // will delete itself when no longer needed.
+ New<ReresolutionRequestHandler>(this, lb_policy_.get(), owning_stack_,
+ combiner_);
+ // Get the new LB policy's initial connectivity state and start a
+ // connectivity watch.
+ GRPC_ERROR_UNREF(*connectivity_error);
+ *connectivity_state =
+ lb_policy_->CheckConnectivityLocked(connectivity_error);
+ if (exit_idle_when_lb_policy_arrives_) {
+ lb_policy_->ExitIdleLocked();
+ exit_idle_when_lb_policy_arrives_ = false;
+ }
+ // Create new watcher. It will delete itself when done.
+ New<LbConnectivityWatcher>(this, *connectivity_state, lb_policy_.get(),
+ owning_stack_, combiner_);
+ }
+}
+
+void RequestRouter::MaybeAddTraceMessagesForAddressChangesLocked(
+ TraceStringVector* trace_strings) {
+ const ServerAddressList* addresses =
+ FindServerAddressListChannelArg(resolver_result_);
+ const bool resolution_contains_addresses =
+ addresses != nullptr && addresses->size() > 0;
+ if (!resolution_contains_addresses &&
+ previous_resolution_contained_addresses_) {
+ trace_strings->push_back(gpr_strdup("Address list became empty"));
+ } else if (resolution_contains_addresses &&
+ !previous_resolution_contained_addresses_) {
+ trace_strings->push_back(gpr_strdup("Address list became non-empty"));
+ }
+ previous_resolution_contained_addresses_ = resolution_contains_addresses;
+}
+
+void RequestRouter::ConcatenateAndAddChannelTraceLocked(
+ TraceStringVector* trace_strings) const {
+ if (!trace_strings->empty()) {
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
+ bool is_first = 1;
+ for (size_t i = 0; i < trace_strings->size(); ++i) {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
+ is_first = false;
+ gpr_strvec_add(&v, (*trace_strings)[i]);
+ }
+ char* flat;
+ size_t flat_len = 0;
+ flat = gpr_strvec_flatten(&v, &flat_len);
+ channelz_node_->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_new(flat, flat_len, gpr_free));
+ gpr_strvec_destroy(&v);
+ }
+}
+
+// Callback invoked when a resolver result is available.
+void RequestRouter::OnResolverResultChangedLocked(void* arg,
+ grpc_error* error) {
+ RequestRouter* self = static_cast<RequestRouter*>(arg);
+ if (self->tracer_->enabled()) {
+ const char* disposition =
+ self->resolver_result_ != nullptr
+ ? ""
+ : (error == GRPC_ERROR_NONE ? " (transient error)"
+ : " (resolver shutdown)");
+ gpr_log(GPR_INFO,
+ "request_router=%p: got resolver result: resolver_result=%p "
+ "error=%s%s",
+ self, self->resolver_result_, grpc_error_string(error),
+ disposition);
+ }
+ // Handle shutdown.
+ if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) {
+ self->OnResolverShutdownLocked(GRPC_ERROR_REF(error));
+ return;
+ }
+ // Data used to set the channel's connectivity state.
+ bool set_connectivity_state = true;
+ // We only want to trace the address resolution in the follow cases:
+ // (a) Address resolution resulted in service config change.
+ // (b) Address resolution that causes number of backends to go from
+ // zero to non-zero.
+ // (c) Address resolution that causes number of backends to go from
+ // non-zero to zero.
+ // (d) Address resolution that causes a new LB policy to be created.
+ //
+ // we track a list of strings to eventually be concatenated and traced.
+ TraceStringVector trace_strings;
+ grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_error* connectivity_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
+ // resolver_result_ will be null in the case of a transient
+ // resolution error. In that case, we don't have any new result to
+ // process, which means that we keep using the previous result (if any).
+ if (self->resolver_result_ == nullptr) {
+ if (self->tracer_->enabled()) {
+ gpr_log(GPR_INFO, "request_router=%p: resolver transient failure", self);
+ }
+ // Don't override connectivity state if we already have an LB policy.
+ if (self->lb_policy_ != nullptr) set_connectivity_state = false;
+ } else {
+ // Parse the resolver result.
+ const char* lb_policy_name = nullptr;
+ grpc_json* lb_policy_config = nullptr;
+ const bool service_config_changed = self->process_resolver_result_(
+ self->process_resolver_result_user_data_, *self->resolver_result_,
+ &lb_policy_name, &lb_policy_config);
+ GPR_ASSERT(lb_policy_name != nullptr);
+ // Check to see if we're already using the right LB policy.
+ const bool lb_policy_name_changed =
+ self->lb_policy_ == nullptr ||
+ strcmp(self->lb_policy_->name(), lb_policy_name) != 0;
+ if (self->lb_policy_ != nullptr && !lb_policy_name_changed) {
+ // Continue using the same LB policy. Update with new addresses.
+ if (self->tracer_->enabled()) {
+ gpr_log(GPR_INFO,
+ "request_router=%p: updating existing LB policy \"%s\" (%p)",
+ self, lb_policy_name, self->lb_policy_.get());
+ }
+ self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config);
+ // No need to set the channel's connectivity state; the existing
+ // watch on the LB policy will take care of that.
+ set_connectivity_state = false;
+ } else {
+ // Instantiate new LB policy.
+ self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config,
+ &connectivity_state, &connectivity_error,
+ &trace_strings);
+ }
+ // Add channel trace event.
+ if (self->channelz_node_ != nullptr) {
+ if (service_config_changed) {
+ // TODO(ncteisen): might be worth somehow including a snippet of the
+ // config in the trace, at the risk of bloating the trace logs.
+ trace_strings.push_back(gpr_strdup("Service config changed"));
+ }
+ self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings);
+ self->ConcatenateAndAddChannelTraceLocked(&trace_strings);
+ }
+ // Clean up.
+ grpc_channel_args_destroy(self->resolver_result_);
+ self->resolver_result_ = nullptr;
+ }
+ // Set the channel's connectivity state if needed.
+ if (set_connectivity_state) {
+ self->SetConnectivityStateLocked(connectivity_state, connectivity_error,
+ "resolver_result");
+ } else {
+ GRPC_ERROR_UNREF(connectivity_error);
+ }
+ // Invoke closures that were waiting for results and renew the watch.
+ GRPC_CLOSURE_LIST_SCHED(&self->waiting_for_resolver_result_closures_);
+ self->resolver_->NextLocked(&self->resolver_result_,
+ &self->on_resolver_result_changed_);
+}
+
+void RequestRouter::RouteCallLocked(Request* request) {
+ GPR_ASSERT(request->pick_.connected_subchannel == nullptr);
+ request->request_router_ = this;
+ if (lb_policy_ != nullptr) {
+ // We already have resolver results, so process the service config
+ // and start an LB pick.
+ request->ProcessServiceConfigAndStartLbPickLocked();
+ } else if (resolver_ == nullptr) {
+ GRPC_CLOSURE_RUN(request->on_route_done_,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ } else {
+ // We do not yet have an LB policy, so wait for a resolver result.
+ if (!started_resolving_) {
+ StartResolvingLocked();
+ }
+ // Create a new waiter, which will delete itself when done.
+ New<Request::ResolverResultWaiter>(request);
+ // Add the request's polling entity to the request_router's
+ // interested_parties, so that the I/O of the resolver can be done
+ // under it. It will be removed in LbPickDoneLocked().
+ request->MaybeAddCallToInterestedPartiesLocked();
+ }
+}
+
+void RequestRouter::ShutdownLocked(grpc_error* error) {
+ if (resolver_ != nullptr) {
+ SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "disconnect");
+ resolver_.reset();
+ if (!started_resolving_) {
+ grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_,
+ GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_);
+ }
+ if (lb_policy_ != nullptr) {
+ grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
+ interested_parties_);
+ lb_policy_.reset();
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+grpc_connectivity_state RequestRouter::GetConnectivityState() {
+ return grpc_connectivity_state_check(&state_tracker_);
+}
+
+void RequestRouter::NotifyOnConnectivityStateChange(
+ grpc_connectivity_state* state, grpc_closure* closure) {
+ grpc_connectivity_state_notify_on_state_change(&state_tracker_, state,
+ closure);
+}
+
+void RequestRouter::ExitIdleLocked() {
+ if (lb_policy_ != nullptr) {
+ lb_policy_->ExitIdleLocked();
+ } else {
+ exit_idle_when_lb_policy_arrives_ = true;
+ if (!started_resolving_ && resolver_ != nullptr) {
+ StartResolvingLocked();
+ }
+ }
+}
+
+void RequestRouter::ResetConnectionBackoffLocked() {
+ if (resolver_ != nullptr) {
+ resolver_->ResetBackoffLocked();
+ resolver_->RequestReresolutionLocked();
+ }
+ if (lb_policy_ != nullptr) {
+ lb_policy_->ResetBackoffLocked();
+ }
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/request_routing.h b/src/core/ext/filters/client_channel/request_routing.h
new file mode 100644
index 0000000000..0c671229c8
--- /dev/null
+++ b/src/core/ext/filters/client_channel/request_routing.h
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/resolver.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/iomgr/call_combiner.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata_batch.h"
+
+namespace grpc_core {
+
+class RequestRouter {
+ public:
+ class Request {
+ public:
+ // Synchronous callback that applies the service config to a call.
+ // Returns false if the call should be failed.
+ typedef bool (*ApplyServiceConfigCallback)(void* user_data);
+
+ Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner,
+ grpc_polling_entity* pollent,
+ grpc_metadata_batch* send_initial_metadata,
+ uint32_t* send_initial_metadata_flags,
+ ApplyServiceConfigCallback apply_service_config,
+ void* apply_service_config_user_data, grpc_closure* on_route_done);
+
+ ~Request();
+
+ // TODO(roth): It seems a bit ugly to expose this member in a
+ // non-const way. Find a better API to avoid this.
+ LoadBalancingPolicy::PickState* pick() { return &pick_; }
+
+ private:
+ friend class RequestRouter;
+
+ class ResolverResultWaiter;
+ class AsyncPickCanceller;
+
+ void ProcessServiceConfigAndStartLbPickLocked();
+ void StartLbPickLocked();
+ static void LbPickDoneLocked(void* arg, grpc_error* error);
+
+ void MaybeAddCallToInterestedPartiesLocked();
+ void MaybeRemoveCallFromInterestedPartiesLocked();
+
+ // Populated by caller.
+ grpc_call_stack* owning_call_;
+ grpc_call_combiner* call_combiner_;
+ grpc_polling_entity* pollent_;
+ ApplyServiceConfigCallback apply_service_config_;
+ void* apply_service_config_user_data_;
+ grpc_closure* on_route_done_;
+ LoadBalancingPolicy::PickState pick_;
+
+ // Internal state.
+ RequestRouter* request_router_ = nullptr;
+ bool pollent_added_to_interested_parties_ = false;
+ grpc_closure on_pick_done_;
+ AsyncPickCanceller* pick_canceller_ = nullptr;
+ };
+
+ // Synchronous callback that takes the service config JSON string and
+ // LB policy name.
+ // Returns true if the service config has changed since the last result.
+ typedef bool (*ProcessResolverResultCallback)(void* user_data,
+ const grpc_channel_args& args,
+ const char** lb_policy_name,
+ grpc_json** lb_policy_config);
+
+ RequestRouter(grpc_channel_stack* owning_stack, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ grpc_pollset_set* interested_parties, TraceFlag* tracer,
+ ProcessResolverResultCallback process_resolver_result,
+ void* process_resolver_result_user_data, const char* target_uri,
+ const grpc_channel_args* args, grpc_error** error);
+
+ ~RequestRouter();
+
+ void set_channelz_node(channelz::ClientChannelNode* channelz_node) {
+ channelz_node_ = channelz_node;
+ }
+
+ void RouteCallLocked(Request* request);
+
+ // TODO(roth): Add methods to cancel picks.
+
+ void ShutdownLocked(grpc_error* error);
+
+ void ExitIdleLocked();
+ void ResetConnectionBackoffLocked();
+
+ grpc_connectivity_state GetConnectivityState();
+ void NotifyOnConnectivityStateChange(grpc_connectivity_state* state,
+ grpc_closure* closure);
+
+ LoadBalancingPolicy* lb_policy() const { return lb_policy_.get(); }
+
+ private:
+ using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
+
+ class ReresolutionRequestHandler;
+ class LbConnectivityWatcher;
+
+ void StartResolvingLocked();
+ void OnResolverShutdownLocked(grpc_error* error);
+ void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config,
+ grpc_connectivity_state* connectivity_state,
+ grpc_error** connectivity_error,
+ TraceStringVector* trace_strings);
+ void MaybeAddTraceMessagesForAddressChangesLocked(
+ TraceStringVector* trace_strings);
+ void ConcatenateAndAddChannelTraceLocked(
+ TraceStringVector* trace_strings) const;
+ static void OnResolverResultChangedLocked(void* arg, grpc_error* error);
+
+ void SetConnectivityStateLocked(grpc_connectivity_state state,
+ grpc_error* error, const char* reason);
+
+ // Passed in from caller at construction time.
+ grpc_channel_stack* owning_stack_;
+ grpc_combiner* combiner_;
+ grpc_client_channel_factory* client_channel_factory_;
+ grpc_pollset_set* interested_parties_;
+ TraceFlag* tracer_;
+
+ channelz::ClientChannelNode* channelz_node_ = nullptr;
+
+ // Resolver and associated state.
+ OrphanablePtr<Resolver> resolver_;
+ ProcessResolverResultCallback process_resolver_result_;
+ void* process_resolver_result_user_data_;
+ bool started_resolving_ = false;
+ grpc_channel_args* resolver_result_ = nullptr;
+ bool previous_resolution_contained_addresses_ = false;
+ grpc_closure_list waiting_for_resolver_result_closures_;
+ grpc_closure on_resolver_result_changed_;
+
+ // LB policy and associated state.
+ OrphanablePtr<LoadBalancingPolicy> lb_policy_;
+ bool exit_idle_when_lb_policy_arrives_ = false;
+
+ grpc_connectivity_state_tracker state_tracker_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H */
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
index 22b06db45c..9a0122e8ec 100644
--- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc
@@ -43,16 +43,16 @@ namespace grpc_core {
namespace internal {
ProcessedResolverResult::ProcessedResolverResult(
- const grpc_channel_args* resolver_result, bool parse_retry) {
+ const grpc_channel_args& resolver_result, bool parse_retry) {
ProcessServiceConfig(resolver_result, parse_retry);
// If no LB config was found above, just find the LB policy name then.
if (lb_policy_name_ == nullptr) ProcessLbPolicyName(resolver_result);
}
void ProcessedResolverResult::ProcessServiceConfig(
- const grpc_channel_args* resolver_result, bool parse_retry) {
+ const grpc_channel_args& resolver_result, bool parse_retry) {
const grpc_arg* channel_arg =
- grpc_channel_args_find(resolver_result, GRPC_ARG_SERVICE_CONFIG);
+ grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVICE_CONFIG);
const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
if (service_config_json != nullptr) {
service_config_json_.reset(gpr_strdup(service_config_json));
@@ -60,7 +60,7 @@ void ProcessedResolverResult::ProcessServiceConfig(
if (service_config_ != nullptr) {
if (parse_retry) {
channel_arg =
- grpc_channel_args_find(resolver_result, GRPC_ARG_SERVER_URI);
+ grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(channel_arg);
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
@@ -78,7 +78,7 @@ void ProcessedResolverResult::ProcessServiceConfig(
}
void ProcessedResolverResult::ProcessLbPolicyName(
- const grpc_channel_args* resolver_result) {
+ const grpc_channel_args& resolver_result) {
// Prefer the LB policy name found in the service config. Note that this is
// checking the deprecated loadBalancingPolicy field, rather than the new
// loadBalancingConfig field.
@@ -96,13 +96,13 @@ void ProcessedResolverResult::ProcessLbPolicyName(
// Otherwise, find the LB policy name set by the client API.
if (lb_policy_name_ == nullptr) {
const grpc_arg* channel_arg =
- grpc_channel_args_find(resolver_result, GRPC_ARG_LB_POLICY_NAME);
+ grpc_channel_args_find(&resolver_result, GRPC_ARG_LB_POLICY_NAME);
lb_policy_name_.reset(gpr_strdup(grpc_channel_arg_get_string(channel_arg)));
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned.
const ServerAddressList* addresses =
- FindServerAddressListChannelArg(resolver_result);
+ FindServerAddressListChannelArg(&resolver_result);
if (addresses != nullptr) {
bool found_balancer_address = false;
for (size_t i = 0; i < addresses->size(); ++i) {
diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h
index f1fb7406bc..98a9d26c46 100644
--- a/src/core/ext/filters/client_channel/resolver_result_parsing.h
+++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h
@@ -36,8 +36,7 @@ namespace internal {
class ClientChannelMethodParams;
// A table mapping from a method name to its method parameters.
-typedef grpc_core::SliceHashTable<
- grpc_core::RefCountedPtr<ClientChannelMethodParams>>
+typedef SliceHashTable<RefCountedPtr<ClientChannelMethodParams>>
ClientChannelMethodParamsTable;
// A container of processed fields from the resolver result. Simplifies the
@@ -47,33 +46,30 @@ class ProcessedResolverResult {
// Processes the resolver result and populates the relative members
// for later consumption. Tries to parse retry parameters only if parse_retry
// is true.
- ProcessedResolverResult(const grpc_channel_args* resolver_result,
+ ProcessedResolverResult(const grpc_channel_args& resolver_result,
bool parse_retry);
// Getters. Any managed object's ownership is transferred.
- grpc_core::UniquePtr<char> service_config_json() {
+ UniquePtr<char> service_config_json() {
return std::move(service_config_json_);
}
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() {
+ RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() {
return std::move(retry_throttle_data_);
}
- grpc_core::RefCountedPtr<ClientChannelMethodParamsTable>
- method_params_table() {
+ RefCountedPtr<ClientChannelMethodParamsTable> method_params_table() {
return std::move(method_params_table_);
}
- grpc_core::UniquePtr<char> lb_policy_name() {
- return std::move(lb_policy_name_);
- }
+ UniquePtr<char> lb_policy_name() { return std::move(lb_policy_name_); }
grpc_json* lb_policy_config() { return lb_policy_config_; }
private:
// Finds the service config; extracts LB config and (maybe) retry throttle
// params from it.
- void ProcessServiceConfig(const grpc_channel_args* resolver_result,
+ void ProcessServiceConfig(const grpc_channel_args& resolver_result,
bool parse_retry);
// Finds the LB policy name (when no LB config was found).
- void ProcessLbPolicyName(const grpc_channel_args* resolver_result);
+ void ProcessLbPolicyName(const grpc_channel_args& resolver_result);
// Parses the service config. Intended to be used by
// ServiceConfig::ParseGlobalParams.
@@ -85,16 +81,16 @@ class ProcessedResolverResult {
void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field);
// Service config.
- grpc_core::UniquePtr<char> service_config_json_;
- grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config_;
+ UniquePtr<char> service_config_json_;
+ UniquePtr<grpc_core::ServiceConfig> service_config_;
// LB policy.
grpc_json* lb_policy_config_ = nullptr;
- grpc_core::UniquePtr<char> lb_policy_name_;
+ UniquePtr<char> lb_policy_name_;
// Retry throttle data.
char* server_name_ = nullptr;
- grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
+ RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
// Method params table.
- grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
+ RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
};
// The parameters of a method.
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index cfcb190d60..c268c18664 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -126,6 +126,7 @@ struct grpc_tcp {
int bytes_counter;
bool socket_ts_enabled; /* True if timestamping options are set on the socket
*/
+ bool ts_capable; /* Cache whether we can set timestamping options */
gpr_atm
stop_error_notification; /* Set to 1 if we do not want to be notified on
errors anymore */
@@ -589,7 +590,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) {
*/
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
- ssize_t* sent_length, grpc_error** error);
+ ssize_t* sent_length);
/** The callback function to be invoked when we get an error on the socket. */
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
@@ -597,13 +598,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
#ifdef GRPC_LINUX_ERRQUEUE
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
- ssize_t* sent_length,
- grpc_error** error) {
+ ssize_t* sent_length) {
if (!tcp->socket_ts_enabled) {
uint32_t opt = grpc_core::kTimestampingSocketOptions;
if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
static_cast<void*>(&opt), sizeof(opt)) != 0) {
- *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
@@ -784,8 +783,7 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
#else /* GRPC_LINUX_ERRQUEUE */
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
- ssize_t* sent_length,
- grpc_error** error) {
+ ssize_t* sent_length) {
gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
GPR_ASSERT(0);
return false;
@@ -804,7 +802,7 @@ void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
&tcp->tb_head, tcp->outgoing_buffer_arg,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed"));
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("TracedBuffer list shutdown"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
}
@@ -820,7 +818,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
- ssize_t sent_length;
+ ssize_t sent_length = 0;
size_t sending_length;
size_t trailing;
size_t unwind_slice_idx;
@@ -855,13 +853,19 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
msg.msg_flags = 0;
+ bool tried_sending_message = false;
if (tcp->outgoing_buffer_arg != nullptr) {
- if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
- error)) {
+ if (!tcp->ts_capable ||
+ !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) {
+ /* We could not set socket options to collect Fathom timestamps.
+ * Fallback on writing without timestamps. */
+ tcp->ts_capable = false;
tcp_shutdown_buffer_list(tcp);
- return true; /* something went wrong with timestamps */
+ } else {
+ tried_sending_message = true;
}
- } else {
+ }
+ if (!tried_sending_message) {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
@@ -1117,6 +1121,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->is_first_read = true;
tcp->bytes_counter = -1;
tcp->socket_ts_enabled = false;
+ tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/lib/security/credentials/composite/composite_credentials.cc b/src/core/lib/security/credentials/composite/composite_credentials.cc
index 85dcd4693b..586bbed778 100644
--- a/src/core/lib/security/credentials/composite/composite_credentials.cc
+++ b/src/core/lib/security/credentials/composite/composite_credentials.cc
@@ -35,44 +35,6 @@
static void composite_call_metadata_cb(void* arg, grpc_error* error);
-grpc_call_credentials_array::~grpc_call_credentials_array() {
- for (size_t i = 0; i < num_creds_; ++i) {
- creds_array_[i].~RefCountedPtr<grpc_call_credentials>();
- }
- if (creds_array_ != nullptr) {
- gpr_free(creds_array_);
- }
-}
-
-grpc_call_credentials_array::grpc_call_credentials_array(
- const grpc_call_credentials_array& that)
- : num_creds_(that.num_creds_) {
- reserve(that.capacity_);
- for (size_t i = 0; i < num_creds_; ++i) {
- new (&creds_array_[i])
- grpc_core::RefCountedPtr<grpc_call_credentials>(that.creds_array_[i]);
- }
-}
-
-void grpc_call_credentials_array::reserve(size_t capacity) {
- if (capacity_ >= capacity) {
- return;
- }
- grpc_core::RefCountedPtr<grpc_call_credentials>* new_arr =
- static_cast<grpc_core::RefCountedPtr<grpc_call_credentials>*>(gpr_malloc(
- sizeof(grpc_core::RefCountedPtr<grpc_call_credentials>) * capacity));
- if (creds_array_ != nullptr) {
- for (size_t i = 0; i < num_creds_; ++i) {
- new (&new_arr[i]) grpc_core::RefCountedPtr<grpc_call_credentials>(
- std::move(creds_array_[i]));
- creds_array_[i].~RefCountedPtr<grpc_call_credentials>();
- }
- gpr_free(creds_array_);
- }
- creds_array_ = new_arr;
- capacity_ = capacity;
-}
-
namespace {
struct grpc_composite_call_credentials_metadata_context {
grpc_composite_call_credentials_metadata_context(
@@ -103,13 +65,13 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error) {
grpc_composite_call_credentials_metadata_context* ctx =
static_cast<grpc_composite_call_credentials_metadata_context*>(arg);
if (error == GRPC_ERROR_NONE) {
- const grpc_call_credentials_array& inner = ctx->composite_creds->inner();
+ const grpc_composite_call_credentials::CallCredentialsList& inner =
+ ctx->composite_creds->inner();
/* See if we need to get some more metadata. */
if (ctx->creds_index < inner.size()) {
- if (inner.get(ctx->creds_index++)
- ->get_request_metadata(
- ctx->pollent, ctx->auth_md_context, ctx->md_array,
- &ctx->internal_on_request_metadata, &error)) {
+ if (inner[ctx->creds_index++]->get_request_metadata(
+ ctx->pollent, ctx->auth_md_context, ctx->md_array,
+ &ctx->internal_on_request_metadata, &error)) {
// Synchronous response, so call ourselves recursively.
composite_call_metadata_cb(arg, error);
GRPC_ERROR_UNREF(error);
@@ -130,12 +92,11 @@ bool grpc_composite_call_credentials::get_request_metadata(
ctx = grpc_core::New<grpc_composite_call_credentials_metadata_context>(
this, pollent, auth_md_context, md_array, on_request_metadata);
bool synchronous = true;
- const grpc_call_credentials_array& inner = ctx->composite_creds->inner();
+ const CallCredentialsList& inner = ctx->composite_creds->inner();
while (ctx->creds_index < inner.size()) {
- if (inner.get(ctx->creds_index++)
- ->get_request_metadata(ctx->pollent, ctx->auth_md_context,
- ctx->md_array,
- &ctx->internal_on_request_metadata, error)) {
+ if (inner[ctx->creds_index++]->get_request_metadata(
+ ctx->pollent, ctx->auth_md_context, ctx->md_array,
+ &ctx->internal_on_request_metadata, error)) {
if (*error != GRPC_ERROR_NONE) break;
} else {
synchronous = false; // Async return.
@@ -149,7 +110,7 @@ bool grpc_composite_call_credentials::get_request_metadata(
void grpc_composite_call_credentials::cancel_get_request_metadata(
grpc_credentials_mdelem_array* md_array, grpc_error* error) {
for (size_t i = 0; i < inner_.size(); ++i) {
- inner_.get(i)->cancel_get_request_metadata(md_array, GRPC_ERROR_REF(error));
+ inner_[i]->cancel_get_request_metadata(md_array, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -172,7 +133,7 @@ void grpc_composite_call_credentials::push_to_inner(
auto composite_creds =
static_cast<grpc_composite_call_credentials*>(creds.get());
for (size_t i = 0; i < composite_creds->inner().size(); ++i) {
- inner_.push_back(std::move(composite_creds->inner_.get_mutable(i)));
+ inner_.push_back(std::move(composite_creds->inner_[i]));
}
}
diff --git a/src/core/lib/security/credentials/composite/composite_credentials.h b/src/core/lib/security/credentials/composite/composite_credentials.h
index 6b7fca1370..7a1c7d5e42 100644
--- a/src/core/lib/security/credentials/composite/composite_credentials.h
+++ b/src/core/lib/security/credentials/composite/composite_credentials.h
@@ -21,43 +21,10 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/security/credentials/credentials.h"
-// TODO(soheil): Replace this with InlinedVector once #16032 is resolved.
-class grpc_call_credentials_array {
- public:
- grpc_call_credentials_array() = default;
- grpc_call_credentials_array(const grpc_call_credentials_array& that);
-
- ~grpc_call_credentials_array();
-
- void reserve(size_t capacity);
-
- // Must reserve before pushing any data.
- void push_back(grpc_core::RefCountedPtr<grpc_call_credentials> cred) {
- GPR_DEBUG_ASSERT(capacity_ > num_creds_);
- new (&creds_array_[num_creds_++])
- grpc_core::RefCountedPtr<grpc_call_credentials>(std::move(cred));
- }
-
- const grpc_core::RefCountedPtr<grpc_call_credentials>& get(size_t i) const {
- GPR_DEBUG_ASSERT(i < num_creds_);
- return creds_array_[i];
- }
- grpc_core::RefCountedPtr<grpc_call_credentials>& get_mutable(size_t i) {
- GPR_DEBUG_ASSERT(i < num_creds_);
- return creds_array_[i];
- }
-
- size_t size() const { return num_creds_; }
-
- private:
- grpc_core::RefCountedPtr<grpc_call_credentials>* creds_array_ = nullptr;
- size_t num_creds_ = 0;
- size_t capacity_ = 0;
-};
-
/* -- Composite channel credentials. -- */
class grpc_composite_channel_credentials : public grpc_channel_credentials {
@@ -97,6 +64,10 @@ class grpc_composite_channel_credentials : public grpc_channel_credentials {
class grpc_composite_call_credentials : public grpc_call_credentials {
public:
+ using CallCredentialsList =
+ grpc_core::InlinedVector<grpc_core::RefCountedPtr<grpc_call_credentials>,
+ 2>;
+
grpc_composite_call_credentials(
grpc_core::RefCountedPtr<grpc_call_credentials> creds1,
grpc_core::RefCountedPtr<grpc_call_credentials> creds2);
@@ -111,13 +82,13 @@ class grpc_composite_call_credentials : public grpc_call_credentials {
void cancel_get_request_metadata(grpc_credentials_mdelem_array* md_array,
grpc_error* error) override;
- const grpc_call_credentials_array& inner() const { return inner_; }
+ const CallCredentialsList& inner() const { return inner_; }
private:
void push_to_inner(grpc_core::RefCountedPtr<grpc_call_credentials> creds,
bool is_composite);
- grpc_call_credentials_array inner_;
+ CallCredentialsList inner_;
};
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_COMPOSITE_COMPOSITE_CREDENTIALS_H \
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc
index c7d1b36ff0..cdef0f322a 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc
@@ -31,7 +31,9 @@
#include <grpc/support/sync.h>
extern "C" {
+#include <openssl/bn.h>
#include <openssl/pem.h>
+#include <openssl/rsa.h>
}
#include "src/core/lib/gpr/string.h"
diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc
index efaf733503..fb6ea19210 100644
--- a/src/core/tsi/ssl_transport_security.cc
+++ b/src/core/tsi/ssl_transport_security.cc
@@ -156,9 +156,13 @@ static unsigned long openssl_thread_id_cb(void) {
#endif
static void init_openssl(void) {
+#if OPENSSL_API_COMPAT >= 0x10100000L
+ OPENSSL_init_ssl(0, NULL);
+#else
SSL_library_init();
SSL_load_error_strings();
OpenSSL_add_all_algorithms();
+#endif
#if OPENSSL_VERSION_NUMBER < 0x10100000
if (!CRYPTO_get_locking_callback()) {
int num_locks = CRYPTO_num_locks();
@@ -1649,7 +1653,11 @@ tsi_result tsi_create_ssl_client_handshaker_factory_with_options(
return TSI_INVALID_ARGUMENT;
}
+#if defined(OPENSSL_NO_TLS1_2_METHOD) || OPENSSL_API_COMPAT >= 0x10100000L
+ ssl_context = SSL_CTX_new(TLS_method());
+#else
ssl_context = SSL_CTX_new(TLSv1_2_method());
+#endif
if (ssl_context == nullptr) {
gpr_log(GPR_ERROR, "Could not create ssl context.");
return TSI_INVALID_ARGUMENT;
@@ -1806,7 +1814,11 @@ tsi_result tsi_create_ssl_server_handshaker_factory_with_options(
for (i = 0; i < options->num_key_cert_pairs; i++) {
do {
+#if defined(OPENSSL_NO_TLS1_2_METHOD) || OPENSSL_API_COMPAT >= 0x10100000L
+ impl->ssl_contexts[i] = SSL_CTX_new(TLS_method());
+#else
impl->ssl_contexts[i] = SSL_CTX_new(TLSv1_2_method());
+#endif
if (impl->ssl_contexts[i] == nullptr) {
gpr_log(GPR_ERROR, "Could not create ssl context.");
result = TSI_OUT_OF_RESOURCES;
diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc
index 78fc69a805..160590353a 100644
--- a/src/cpp/ext/filters/census/context.cc
+++ b/src/cpp/ext/filters/census/context.cc
@@ -28,6 +28,9 @@ using ::opencensus::trace::SpanContext;
void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
absl::string_view primary_role,
absl::string_view method, CensusContext* context) {
+ // Destruct the current CensusContext to free the Span memory before
+ // overwriting it below.
+ context->~CensusContext();
GrpcTraceContext trace_ctxt;
if (TraceContextEncoding::Decode(tracing, &trace_ctxt) !=
TraceContextEncoding::kEncodeDecodeFailure) {
@@ -42,6 +45,9 @@ void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
CensusContext* parent_ctxt) {
+ // Destruct the current CensusContext to free the Span memory before
+ // overwriting it below.
+ ctxt->~CensusContext();
if (parent_ctxt != nullptr) {
SpanContext span_ctxt = parent_ctxt->Context();
Span span = parent_ctxt->Span();
diff --git a/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets b/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets
index 5f76c03ce5..3fe1ccc918 100644
--- a/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets
+++ b/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets
@@ -22,9 +22,8 @@
<Target Name="gRPC_ResolvePluginFullPath" AfterTargets="Protobuf_ResolvePlatform">
<PropertyGroup>
<!-- TODO(kkm): Do not use Protobuf_PackagedToolsPath, roll gRPC's own. -->
- <!-- TODO(kkm): Do not package windows x64 builds (#13098). -->
<gRPC_PluginFullPath Condition=" '$(gRPC_PluginFullPath)' == '' and '$(Protobuf_ToolsOs)' == 'windows' "
- >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_x86\$(gRPC_PluginFileName).exe</gRPC_PluginFullPath>
+ >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)\$(gRPC_PluginFileName).exe</gRPC_PluginFullPath>
<gRPC_PluginFullPath Condition=" '$(gRPC_PluginFullPath)' == '' "
>$(Protobuf_PackagedToolsPath)/$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)/$(gRPC_PluginFileName)</gRPC_PluginFullPath>
</PropertyGroup>
diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets
index 1d233d23a8..26f9efb5a8 100644
--- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets
+++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets
@@ -74,9 +74,8 @@
<!-- Next try OS and CPU resolved by ProtoToolsPlatform. -->
<Protobuf_ToolsOs Condition=" '$(Protobuf_ToolsOs)' == '' ">$(_Protobuf_ToolsOs)</Protobuf_ToolsOs>
<Protobuf_ToolsCpu Condition=" '$(Protobuf_ToolsCpu)' == '' ">$(_Protobuf_ToolsCpu)</Protobuf_ToolsCpu>
- <!-- TODO(kkm): Do not package windows x64 builds (#13098). -->
<Protobuf_ProtocFullPath Condition=" '$(Protobuf_ProtocFullPath)' == '' and '$(Protobuf_ToolsOs)' == 'windows' "
- >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_x86\protoc.exe</Protobuf_ProtocFullPath>
+ >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)\protoc.exe</Protobuf_ProtocFullPath>
<Protobuf_ProtocFullPath Condition=" '$(Protobuf_ProtocFullPath)' == '' "
>$(Protobuf_PackagedToolsPath)/$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)/protoc</Protobuf_ProtocFullPath>
</PropertyGroup>
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index 32e3956a1e..83775f86e1 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -242,3 +242,12 @@ pod `gRPC-Core`, :podspec => "." # assuming gRPC-Core.podspec is in the same dir
These steps should allow gRPC to use OpenSSL and drop BoringSSL dependency. If you see any issue,
file an issue to us.
+
+## Upgrade issue with BoringSSL
+If you were using an old version of gRPC (<= v1.14) which depended on pod `BoringSSL` rather than
+`BoringSSL-GRPC` and meet issue with the library like:
+```
+ld: framework not found openssl
+```
+updating `-framework openssl` in Other Linker Flags to `-framework openssl_grpc` in your project
+may resolve this issue (see [#16821](https://github.com/grpc/grpc/issues/16821)).
diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py
index c17824563d..9b990f490d 100644
--- a/src/python/grpcio/grpc/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -46,7 +46,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
# Hack to determine if these are JWT creds and we need to pass
# additional_claims when getting a token
- self._is_jwt = 'additional_claims' in inspect.getargspec(
+ self._is_jwt = 'additional_claims' in inspect.getargspec( # pylint: disable=deprecated-method
credentials.get_access_token).args
def __call__(self, context, callback):
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 96118badad..8051fb306c 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -526,7 +526,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata, wait_for_ready)
if state is None:
- raise rendezvous
+ raise rendezvous # pylint: disable-msg=raising-bad-type
else:
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@@ -537,7 +537,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
),), self._context)
event = call.next_event()
_handle_event(event, state, self._response_deserializer)
- return state, call,
+ return state, call
def __call__(self,
request,
@@ -568,7 +568,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata, wait_for_ready)
if state is None:
- raise rendezvous
+ raise rendezvous # pylint: disable-msg=raising-bad-type
else:
event_handler = _event_handler(state, self._response_deserializer)
call = self._managed_call(
@@ -603,7 +603,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready)
if serialized_request is None:
- raise rendezvous
+ raise rendezvous # pylint: disable-msg=raising-bad-type
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
operationses = (
@@ -660,7 +660,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
state.condition.notify_all()
if not state.due:
break
- return state, call,
+ return state, call
def __call__(self,
request_iterator,
@@ -755,10 +755,10 @@ class _InitialMetadataFlags(int):
def with_wait_for_ready(self, wait_for_ready):
if wait_for_ready is not None:
if wait_for_ready:
- self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
+ return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
elif not wait_for_ready:
- self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
+ return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
return self
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 141116df5d..3c33b46dbb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
cdef _interpret_event(grpc_event c_event):
cdef _Tag tag
if c_event.type == GRPC_QUEUE_TIMEOUT:
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ # TODO(ericgribkoff) Do not coopt ConnectivityEvent here.
return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif c_event.type == GRPC_QUEUE_SHUTDOWN:
# NOTE(nathaniel): For now we coopt ConnectivityEvent here.
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index ce701724fd..e89e02b171 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -128,7 +128,10 @@ cdef class Server:
with nogil:
grpc_server_cancel_all_calls(self.c_server)
- def __dealloc__(self):
+ # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any,
+ # portion of this is safe to call from __dealloc__, and potentially remove
+ # backup_shutdown_queue.
+ def destroy(self):
if self.c_server != NULL:
if not self.is_started:
pass
@@ -146,4 +149,8 @@ cdef class Server:
while not self.is_shutdown:
time.sleep(0)
grpc_server_destroy(self.c_server)
- grpc_shutdown()
+ self.c_server = NULL
+
+ def __dealloc(self):
+ if self.c_server == NULL:
+ grpc_shutdown()
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 3bbfa47da5..eb750ef1a8 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -48,7 +48,7 @@ _CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0
-_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
+_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
def _serialized_request(request_event):
@@ -676,6 +676,9 @@ class _ServerState(object):
self.rpc_states = set()
self.due = set()
+ # A "volatile" flag to interrupt the daemon serving thread
+ self.server_deallocated = False
+
def _add_generic_handlers(state, generic_handlers):
with state.lock:
@@ -702,6 +705,7 @@ def _request_call(state):
# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
def _stop_serving(state):
if not state.rpc_states and not state.due:
+ state.server.destroy()
for shutdown_event in state.shutdown_events:
shutdown_event.set()
state.stage = _ServerStage.STOPPED
@@ -715,49 +719,69 @@ def _on_call_completed(state):
state.active_rpc_count -= 1
-def _serve(state):
- while True:
- event = state.completion_queue.poll()
- if event.tag is _SHUTDOWN_TAG:
+def _process_event_and_continue(state, event):
+ should_continue = True
+ if event.tag is _SHUTDOWN_TAG:
+ with state.lock:
+ state.due.remove(_SHUTDOWN_TAG)
+ if _stop_serving(state):
+ should_continue = False
+ elif event.tag is _REQUEST_CALL_TAG:
+ with state.lock:
+ state.due.remove(_REQUEST_CALL_TAG)
+ concurrency_exceeded = (
+ state.maximum_concurrent_rpcs is not None and
+ state.active_rpc_count >= state.maximum_concurrent_rpcs)
+ rpc_state, rpc_future = _handle_call(
+ event, state.generic_handlers, state.interceptor_pipeline,
+ state.thread_pool, concurrency_exceeded)
+ if rpc_state is not None:
+ state.rpc_states.add(rpc_state)
+ if rpc_future is not None:
+ state.active_rpc_count += 1
+ rpc_future.add_done_callback(
+ lambda unused_future: _on_call_completed(state))
+ if state.stage is _ServerStage.STARTED:
+ _request_call(state)
+ elif _stop_serving(state):
+ should_continue = False
+ else:
+ rpc_state, callbacks = event.tag(event)
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(callback,
+ 'Exception calling callback!')
+ if rpc_state is not None:
with state.lock:
- state.due.remove(_SHUTDOWN_TAG)
+ state.rpc_states.remove(rpc_state)
if _stop_serving(state):
- return
- elif event.tag is _REQUEST_CALL_TAG:
- with state.lock:
- state.due.remove(_REQUEST_CALL_TAG)
- concurrency_exceeded = (
- state.maximum_concurrent_rpcs is not None and
- state.active_rpc_count >= state.maximum_concurrent_rpcs)
- rpc_state, rpc_future = _handle_call(
- event, state.generic_handlers, state.interceptor_pipeline,
- state.thread_pool, concurrency_exceeded)
- if rpc_state is not None:
- state.rpc_states.add(rpc_state)
- if rpc_future is not None:
- state.active_rpc_count += 1
- rpc_future.add_done_callback(
- lambda unused_future: _on_call_completed(state))
- if state.stage is _ServerStage.STARTED:
- _request_call(state)
- elif _stop_serving(state):
- return
- else:
- rpc_state, callbacks = event.tag(event)
- for callback in callbacks:
- callable_util.call_logging_exceptions(
- callback, 'Exception calling callback!')
- if rpc_state is not None:
- with state.lock:
- state.rpc_states.remove(rpc_state)
- if _stop_serving(state):
- return
+ should_continue = False
+ return should_continue
+
+
+def _serve(state):
+ while True:
+ timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
+ event = state.completion_queue.poll(timeout)
+ if state.server_deallocated:
+ _begin_shutdown_once(state)
+ if event.completion_type != cygrpc.CompletionType.queue_timeout:
+ if not _process_event_and_continue(state, event):
+ return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
+def _begin_shutdown_once(state):
+ with state.lock:
+ if state.stage is _ServerStage.STARTED:
+ state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+ state.stage = _ServerStage.GRACE
+ state.shutdown_events = []
+ state.due.add(_SHUTDOWN_TAG)
+
+
def _stop(state, grace):
with state.lock:
if state.stage is _ServerStage.STOPPED:
@@ -765,11 +789,7 @@ def _stop(state, grace):
shutdown_event.set()
return shutdown_event
else:
- if state.stage is _ServerStage.STARTED:
- state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
- state.stage = _ServerStage.GRACE
- state.shutdown_events = []
- state.due.add(_SHUTDOWN_TAG)
+ _begin_shutdown_once(state)
shutdown_event = threading.Event()
state.shutdown_events.append(shutdown_event)
if grace is None:
@@ -840,7 +860,9 @@ class _Server(grpc.Server):
return _stop(self._state, grace)
def __del__(self):
- _stop(self._state, None)
+ # We can not grab a lock in __del__(), so set a flag to signal the
+ # serving daemon thread (if it exists) to initiate shutdown.
+ self._state.server_deallocated = True
def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py
index d90b34bcbd..2938a38b44 100644
--- a/src/python/grpcio/grpc/_utilities.py
+++ b/src/python/grpcio/grpc/_utilities.py
@@ -132,15 +132,12 @@ class _ChannelReadyFuture(grpc.Future):
def result(self, timeout=None):
self._block(timeout)
- return None
def exception(self, timeout=None):
self._block(timeout)
- return None
def traceback(self, timeout=None):
self._block(timeout)
- return None
def add_done_callback(self, fn):
with self._condition:
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index c6ca970bee..6a1fd676ca 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -326,6 +326,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/parse_address.cc',
'src/core/ext/filters/client_channel/proxy_mapper.cc',
'src/core/ext/filters/client_channel/proxy_mapper_registry.cc',
+ 'src/core/ext/filters/client_channel/request_routing.cc',
'src/core/ext/filters/client_channel/resolver.cc',
'src/core/ext/filters/client_channel/resolver_registry.cc',
'src/core/ext/filters/client_channel/resolver_result_parsing.cc',
diff --git a/src/python/grpcio_status/grpc_status/rpc_status.py b/src/python/grpcio_status/grpc_status/rpc_status.py
index e23a20968e..87618fa541 100644
--- a/src/python/grpcio_status/grpc_status/rpc_status.py
+++ b/src/python/grpcio_status/grpc_status/rpc_status.py
@@ -24,7 +24,7 @@ import grpc
import google.protobuf # pylint: disable=unused-import
from google.rpc import status_pb2
-_CODE_TO_GRPC_CODE_MAPPING = dict([(x.value[0], x) for x in grpc.StatusCode])
+_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode}
_GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin'
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
index 0e3404b0d0..100d8195f6 100644
--- a/src/python/grpcio_testing/grpc_testing/_server/_handler.py
+++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
@@ -185,7 +185,7 @@ class _Handler(Handler):
elif self._code is None:
self._condition.wait()
else:
- return self._trailing_metadata, self._code, self._details,
+ return self._trailing_metadata, self._code, self._details
def expire(self):
with self._condition:
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index 18413abab0..496bcfbcbf 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -22,7 +22,6 @@ import re
import shutil
import subprocess
import sys
-import traceback
import setuptools
from setuptools.command import build_ext
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index f9cb9d0cec..800b865da6 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -37,13 +37,19 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'coverage>=4.0', 'enum34>=1.0.4',
+ 'coverage>=4.0',
+ 'enum34>=1.0.4',
'grpcio>={version}'.format(version=grpc_version.VERSION),
- 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
+ # TODO(https://github.com/pypa/warehouse/issues/5196)
+ # Re-enable it once we got the name back
+ # 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-status>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
- 'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0',
+ 'oauth2client>=1.4.7',
+ 'protobuf>=3.6.0',
+ 'six>=1.10',
+ 'google-auth>=1.0.0',
'requests>=2.14.2')
if not PY3:
diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py
index eaaa027e61..9ef0f17684 100644
--- a/src/python/grpcio_tests/tests/_runner.py
+++ b/src/python/grpcio_tests/tests/_runner.py
@@ -203,7 +203,7 @@ class Runner(object):
check_kill_self()
time.sleep(0)
case_thread.join()
- except:
+ except: # pylint: disable=try-except-raise
# re-raise the exception after forcing the with-block to end
raise
result.set_output(augmented_case.case, stdout_pipe.output(),
diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
index 8ca5189522..c63ff5cd84 100644
--- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
+++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
@@ -88,11 +88,10 @@ def _generate_channel_server_pairs(n):
def _close_channel_server_pairs(pairs):
for pair in pairs:
pair.server.stop(None)
- # TODO(ericgribkoff) This del should not be required
- del pair.server
pair.channel.close()
+@unittest.skip('https://github.com/pypa/warehouse/issues/5196')
class ChannelzServicerTest(unittest.TestCase):
def _send_successful_unary_unary(self, idx):
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
index e21ea0010a..2b735526cb 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
@@ -144,7 +144,7 @@ class _ProtoBeforeGrpcProtocStyle(object):
absolute_proto_file_names)
pb2_grpc_protoc_exit_code = _protoc(
proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names)
- return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code,
+ return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code
class _GrpcBeforeProtoProtocStyle(object):
@@ -160,7 +160,7 @@ class _GrpcBeforeProtoProtocStyle(object):
proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names)
pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None,
absolute_proto_file_names)
- return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code,
+ return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code
_PROTOC_STYLES = (
@@ -243,9 +243,9 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
def _services_modules(self):
if self.PROTOC_STYLE.grpc_in_pb2_expected():
- return self._services_pb2, self._services_pb2_grpc,
+ return self._services_pb2, self._services_pb2_grpc
else:
- return self._services_pb2_grpc,
+ return (self._services_pb2_grpc,)
def test_imported_attributes(self):
self._protoc()
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
index b46e53315e..43c90af6a7 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
@@ -223,7 +223,7 @@ def _CreateService(payload_pb2, responses_pb2, service_pb2):
server.start()
channel = implementations.insecure_channel('localhost', port)
stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub,
+ yield servicer_methods, stub
server.stop(0)
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index 0488450740..fac0e44e5a 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -180,7 +180,7 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
self._streams = [
_SyncStream(self._stub, self._generic, self._request,
self._handle_response)
- for _ in xrange(config.outstanding_rpcs_per_channel)
+ for _ in range(config.outstanding_rpcs_per_channel)
]
self._curr_stream = 0
diff --git a/src/python/grpcio_tests/tests/qps/client_runner.py b/src/python/grpcio_tests/tests/qps/client_runner.py
index e79abab3c7..a57524c74e 100644
--- a/src/python/grpcio_tests/tests/qps/client_runner.py
+++ b/src/python/grpcio_tests/tests/qps/client_runner.py
@@ -77,7 +77,7 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self):
self._is_running = True
self._client.start()
- for _ in xrange(self._request_count):
+ for _ in range(self._request_count):
self._client.send_request()
def stop(self):
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index 337a94b546..a03367ec63 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -109,7 +109,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer):
start_time = time.time()
# Create a client for each channel
- for i in xrange(config.client_channels):
+ for i in range(config.client_channels):
server = config.server_targets[i % len(config.server_targets)]
runner = self._create_client_runner(server, config, qps_data)
client_runners.append(runner)
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index 41f2e1b6c2..a318b308e6 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -132,9 +132,9 @@ def run_test(args):
server.start()
for test_server_target in test_server_targets:
- for _ in xrange(args.num_channels_per_server):
+ for _ in range(args.num_channels_per_server):
channel = _get_channel(test_server_target, args)
- for _ in xrange(args.num_stubs_per_channel):
+ for _ in range(args.num_stubs_per_channel):
stub = test_pb2_grpc.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event)
diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py
index 3ddeba2373..4d42df0389 100644
--- a/src/python/grpcio_tests/tests/testing/_client_application.py
+++ b/src/python/grpcio_tests/tests/testing/_client_application.py
@@ -130,9 +130,9 @@ def _run_stream_stream(stub):
request_pipe = _Pipe()
response_iterator = stub.StreStre(iter(request_pipe))
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
- first_responses = next(response_iterator), next(response_iterator),
+ first_responses = next(response_iterator), next(response_iterator)
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
- second_responses = next(response_iterator), next(response_iterator),
+ second_responses = next(response_iterator), next(response_iterator)
request_pipe.close()
try:
next(response_iterator)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index b27e6f2693..f202a3f932 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -57,6 +57,7 @@
"unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
+ "unit._server_shutdown_test.ServerShutdown",
"unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse",
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth",
diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel
index 4f850220f8..1b462ec67a 100644
--- a/src/python/grpcio_tests/tests/unit/BUILD.bazel
+++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel
@@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [
# TODO(ghostwriternr): To be added later.
# "_server_ssl_cert_config_test.py",
"_server_test.py",
+ "_server_shutdown_test.py",
"_session_cache_test.py",
]
@@ -50,6 +51,11 @@ py_library(
)
py_library(
+ name = "_server_shutdown_scenarios",
+ srcs = ["_server_shutdown_scenarios.py"],
+)
+
+py_library(
name = "_thread_pool",
srcs = ["_thread_pool.py"],
)
@@ -70,6 +76,7 @@ py_library(
":resources",
":test_common",
":_exit_scenarios",
+ ":_server_shutdown_scenarios",
":_thread_pool",
":_from_grpc_import_star",
"//src/python/grpcio_tests/tests/unit/framework/common",
diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
index ad847ae03e..1ada25382d 100644
--- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
+++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
@@ -14,7 +14,7 @@
_BEFORE_IMPORT = tuple(globals())
-from grpc import * # pylint: disable=wildcard-import
+from grpc import * # pylint: disable=wildcard-import,unused-wildcard-import
_AFTER_IMPORT = tuple(globals())
diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py
new file mode 100644
index 0000000000..1d1fdba11e
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py
@@ -0,0 +1,97 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Defines a number of module-scope gRPC scenarios to test server shutdown."""
+
+import argparse
+import os
+import threading
+import time
+import logging
+
+import grpc
+from tests.unit import test_common
+
+from concurrent import futures
+from six.moves import queue
+
+WAIT_TIME = 1000
+
+REQUEST = b'request'
+RESPONSE = b'response'
+
+SERVER_RAISES_EXCEPTION = 'server_raises_exception'
+SERVER_DEALLOCATED = 'server_deallocated'
+SERVER_FORK_CAN_EXIT = 'server_fork_can_exit'
+
+FORK_EXIT = '/test/ForkExit'
+
+
+def fork_and_exit(request, servicer_context):
+ pid = os.fork()
+ if pid == 0:
+ os._exit(0)
+ return RESPONSE
+
+
+class GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == FORK_EXIT:
+ return grpc.unary_unary_rpc_method_handler(fork_and_exit)
+ else:
+ return None
+
+
+def run_server(port_queue):
+ server = test_common.test_server()
+ port = server.add_insecure_port('[::]:0')
+ port_queue.put(port)
+ server.add_generic_rpc_handlers((GenericHandler(),))
+ server.start()
+ # threading.Event.wait() does not exhibit the bug identified in
+ # https://github.com/grpc/grpc/issues/17093, sleep instead
+ time.sleep(WAIT_TIME)
+
+
+def run_test(args):
+ if args.scenario == SERVER_RAISES_EXCEPTION:
+ server = test_common.test_server()
+ server.start()
+ raise Exception()
+ elif args.scenario == SERVER_DEALLOCATED:
+ server = test_common.test_server()
+ server.start()
+ server.__del__()
+ while server._state.stage != grpc._server._ServerStage.STOPPED:
+ pass
+ elif args.scenario == SERVER_FORK_CAN_EXIT:
+ port_queue = queue.Queue()
+ thread = threading.Thread(target=run_server, args=(port_queue,))
+ thread.daemon = True
+ thread.start()
+ port = port_queue.get()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+ multi_callable = channel.unary_unary(FORK_EXIT)
+ result, call = multi_callable.with_call(REQUEST, wait_for_ready=True)
+ os.wait()
+ else:
+ raise ValueError('unknown test scenario')
+
+
+if __name__ == '__main__':
+ logging.basicConfig()
+ parser = argparse.ArgumentParser()
+ parser.add_argument('scenario', type=str)
+ args = parser.parse_args()
+ run_test(args)
diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py
new file mode 100644
index 0000000000..47446d65a5
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py
@@ -0,0 +1,90 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests clean shutdown of server on various interpreter exit conditions.
+
+The tests in this module spawn a subprocess for each test case, the
+test is considered successful if it doesn't hang/timeout.
+"""
+
+import atexit
+import os
+import subprocess
+import sys
+import threading
+import unittest
+import logging
+
+from tests.unit import _server_shutdown_scenarios
+
+SCENARIO_FILE = os.path.abspath(
+ os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ '_server_shutdown_scenarios.py'))
+INTERPRETER = sys.executable
+BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
+
+processes = []
+process_lock = threading.Lock()
+
+
+# Make sure we attempt to clean up any
+# processes we may have left running
+def cleanup_processes():
+ with process_lock:
+ for process in processes:
+ try:
+ process.kill()
+ except Exception: # pylint: disable=broad-except
+ pass
+
+
+atexit.register(cleanup_processes)
+
+
+def wait(process):
+ with process_lock:
+ processes.append(process)
+ process.wait()
+
+
+class ServerShutdown(unittest.TestCase):
+
+ # Currently we shut down a server (if possible) after the Python server
+ # instance is garbage collected. This behavior may change in the future.
+ def test_deallocated_server_stops(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED],
+ stdout=sys.stdout,
+ stderr=sys.stderr)
+ wait(process)
+
+ def test_server_exception_exits(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION],
+ stdout=sys.stdout,
+ stderr=sys.stderr)
+ wait(process)
+
+ @unittest.skipIf(os.name == 'nt', 'fork not supported on windows')
+ def test_server_fork_can_exit(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT],
+ stdout=sys.stdout,
+ stderr=sys.stderr)
+ wait(process)
+
+
+if __name__ == '__main__':
+ logging.basicConfig()
+ unittest.main(verbosity=2)