aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc235
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h19
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc49
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc98
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc128
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.cc35
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc4
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.cc90
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc21
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.h10
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.cc5
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc23
15 files changed, 507 insertions, 243 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 58496dc246..aced9adf9f 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -210,6 +210,14 @@ typedef struct client_channel_channel_data {
char* info_service_config_json;
} channel_data;
+typedef struct {
+ channel_data* chand;
+ /** used as an identifier, don't dereference it because the LB policy may be
+ * non-existing when the callback is run */
+ grpc_lb_policy* lb_policy;
+ grpc_closure closure;
+} reresolution_request_args;
+
/** We create one watcher for each new lb_policy that is returned from a
resolver, to watch for state changes from the lb_policy. When a state
change is seen, we update the channel, and create a new watcher. */
@@ -258,21 +266,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx,
static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
- grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
}
- if (publish_state == GRPC_CHANNEL_SHUTDOWN &&
- w->chand->resolver != nullptr) {
- publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
- GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
- w->chand->lb_policy = nullptr;
- }
- set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state,
GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
@@ -369,6 +369,27 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) {
}
}
+static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ reresolution_request_args* args = (reresolution_request_args*)arg;
+ channel_data* chand = args->chand;
+ // If this invocation is for a stale LB policy, treat it as an LB shutdown
+ // signal.
+ if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
+ chand->resolver == nullptr) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution");
+ gpr_free(args);
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
+ }
+ grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver);
+ // Give back the closure to the LB policy.
+ grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy,
+ &args->closure);
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
channel_data* chand = (channel_data*)arg;
@@ -385,100 +406,114 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
grpc_slice_hash_table* method_params_table = nullptr;
if (chand->resolver_result != nullptr) {
- // Find LB policy name.
- const char* lb_policy_name = nullptr;
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- lb_policy_name = channel_arg->value.string;
- }
- // Special case: If at least one balancer address is present, we use
- // the grpclb policy, regardless of what the resolver actually specified.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- (grpc_lb_addresses*)channel_arg->value.pointer.p;
- bool found_balancer_address = false;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) {
- found_balancer_address = true;
- break;
+ if (chand->resolver != nullptr) {
+ // Find LB policy name.
+ const char* lb_policy_name = nullptr;
+ const grpc_arg* channel_arg = grpc_channel_args_find(
+ chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
+ if (channel_arg != nullptr) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ lb_policy_name = channel_arg->value.string;
+ }
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver actually specified.
+ channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ (grpc_lb_addresses*)channel_arg->value.pointer.p;
+ bool found_balancer_address = false;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) {
+ found_balancer_address = true;
+ break;
+ }
+ }
+ if (found_balancer_address) {
+ if (lb_policy_name != nullptr &&
+ strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
}
}
- if (found_balancer_address) {
- if (lb_policy_name != nullptr &&
- strcmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided at least one "
- "balancer address -- forcing use of grpclb LB policy",
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy_args.combiner = chand->combiner;
+ // Check to see if we're already using the right LB policy.
+ // Note: It's safe to use chand->info_lb_policy_name here without
+ // taking a lock on chand->info_mu, because this function is the
+ // only thing that modifies its value, and it can only be invoked
+ // once at any given time.
+ lb_policy_name_changed =
+ chand->info_lb_policy_name == nullptr ||
+ gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
+ if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
+ // Continue using the same LB policy. Update with new addresses.
+ lb_policy_updated = true;
+ grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy,
+ &lb_policy_args);
+ } else {
+ // Instantiate new LB policy.
+ new_lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ if (new_lb_policy == nullptr) {
+ gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
+ } else {
+ reresolution_request_args* args =
+ (reresolution_request_args*)gpr_zalloc(sizeof(*args));
+ args->chand = chand;
+ args->lb_policy = new_lb_policy;
+ GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
+ grpc_combiner_scheduler(chand->combiner));
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
+ grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy,
+ &args->closure);
}
- lb_policy_name = "grpclb";
- }
- }
- // Use pick_first if nothing was specified and we didn't select grpclb
- // above.
- if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy_args.combiner = chand->combiner;
- // Check to see if we're already using the right LB policy.
- // Note: It's safe to use chand->info_lb_policy_name here without
- // taking a lock on chand->info_mu, because this function is the
- // only thing that modifies its value, and it can only be invoked
- // once at any given time.
- lb_policy_name_changed =
- chand->info_lb_policy_name == nullptr ||
- gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
- // Continue using the same LB policy. Update with new addresses.
- lb_policy_updated = true;
- grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
- } else {
- // Instantiate new LB policy.
- new_lb_policy =
- grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
- if (new_lb_policy == nullptr) {
- gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
}
- }
- // Find service config.
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- service_config_json = gpr_strdup(channel_arg->value.string);
- grpc_service_config* service_config =
- grpc_service_config_create(service_config_json);
- if (service_config != nullptr) {
- channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(channel_arg != nullptr);
+ // Find service config.
+ channel_arg = grpc_channel_args_find(chand->resolver_result,
+ GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_uri* uri =
- grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
- GPR_ASSERT(uri->path[0] != '\0');
- service_config_parsing_state parsing_state;
- memset(&parsing_state, 0, sizeof(parsing_state));
- parsing_state.server_name =
- uri->path[0] == '/' ? uri->path + 1 : uri->path;
- grpc_service_config_parse_global_params(
- service_config, parse_retry_throttle_params, &parsing_state);
- grpc_uri_destroy(uri);
- retry_throttle_data = parsing_state.retry_throttle_data;
- method_params_table = grpc_service_config_create_method_config_table(
- exec_ctx, service_config, method_parameters_create_from_json,
- method_parameters_ref_wrapper, method_parameters_unref_wrapper);
- grpc_service_config_destroy(service_config);
+ service_config_json = gpr_strdup(channel_arg->value.string);
+ grpc_service_config* service_config =
+ grpc_service_config_create(service_config_json);
+ if (service_config != nullptr) {
+ channel_arg = grpc_channel_args_find(chand->resolver_result,
+ GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(channel_arg != nullptr);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ grpc_uri* uri =
+ grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ service_config_parsing_state parsing_state;
+ memset(&parsing_state, 0, sizeof(parsing_state));
+ parsing_state.server_name =
+ uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ grpc_service_config_parse_global_params(
+ service_config, parse_retry_throttle_params, &parsing_state);
+ grpc_uri_destroy(uri);
+ retry_throttle_data = parsing_state.retry_throttle_data;
+ method_params_table = grpc_service_config_create_method_config_table(
+ exec_ctx, service_config, method_parameters_create_from_json,
+ method_parameters_ref_wrapper, method_parameters_unref_wrapper);
+ grpc_service_config_destroy(service_config);
+ }
}
+ // Before we clean up, save a copy of lb_policy_name, since it might
+ // be pointing to data inside chand->resolver_result.
+ // The copy will be saved in chand->lb_policy_name below.
+ lb_policy_name_dup = gpr_strdup(lb_policy_name);
}
- // Before we clean up, save a copy of lb_policy_name, since it might
- // be pointing to data inside chand->resolver_result.
- // The copy will be saved in chand->lb_policy_name below.
- lb_policy_name_dup = gpr_strdup(lb_policy_name);
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = nullptr;
}
@@ -515,11 +550,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
}
chand->method_params_table = method_params_table;
// If we have a new LB policy or are shutting down (in which case
- // new_lb_policy will be NULL), swap out the LB policy, unreffing the
- // old one and removing its fds from chand->interested_parties.
- // Note that we do NOT do this if either (a) we updated the existing
- // LB policy above or (b) we failed to create the new LB policy (in
- // which case we want to continue using the most recent one we had).
+ // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one
+ // and removing its fds from chand->interested_parties. Note that we do NOT do
+ // this if either (a) we updated the existing LB policy above or (b) we failed
+ // to create the new LB policy (in which case we want to continue using the
+ // most recent one we had).
if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
if (chand->lb_policy != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 6276c3e952..db566f1b56 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -161,3 +161,30 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx,
const grpc_lb_policy_args* lb_policy_args) {
policy->vtable->update_locked(exec_ctx, policy, lb_policy_args);
}
+
+void grpc_lb_policy_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ policy->vtable->set_reresolve_closure_locked(exec_ctx, policy,
+ request_reresolution);
+}
+
+void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy* policy,
+ grpc_core::TraceFlag* grpc_lb_trace,
+ grpc_error* error) {
+ if (policy->request_reresolution != nullptr) {
+ GRPC_CLOSURE_SCHED(exec_ctx, policy->request_reresolution, error);
+ policy->request_reresolution = nullptr;
+ if (grpc_lb_trace->enabled()) {
+ gpr_log(GPR_DEBUG,
+ "%s %p: scheduling re-resolution closure with error=%s.",
+ grpc_lb_trace->name(), policy, grpc_error_string(error));
+ }
+ } else {
+ if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
+ grpc_lb_trace->name(), policy);
+ }
+ }
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 72d027995a..d3159eebf3 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -38,6 +38,8 @@ struct grpc_lb_policy {
grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */
grpc_combiner* combiner;
+ /* callback to force a re-resolution */
+ grpc_closure* request_reresolution;
};
/** Extra arguments for an LB pick */
@@ -96,6 +98,11 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
+
+ /** \see grpc_lb_policy_set_reresolve_closure */
+ void (*set_reresolve_closure_locked)(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy* policy,
+ grpc_closure* request_reresolution);
};
#ifndef NDEBUG
@@ -202,4 +209,16 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args);
+/** Set the re-resolution closure to \a request_reresolution. */
+void grpc_lb_policy_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution);
+
+/** Try to request a re-resolution. It's NOT a public API; it's only for use by
+ the LB policy implementations. */
+void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
+ grpc_lb_policy* policy,
+ grpc_core::TraceFlag* grpc_lb_trace,
+ grpc_error* error);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 2f8e0c93b2..db06fc20b6 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -637,7 +637,7 @@ static void update_lb_connectivity_status_locked(
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
* immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be resposible for.
+ * cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
@@ -766,6 +766,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
+ grpc_lb_policy_set_reresolve_closure_locked(
+ exec_ctx, new_rr_policy, glb_policy->base.request_reresolution);
+ glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state =
@@ -991,6 +994,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
@@ -1021,6 +1025,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ } else {
+ grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace,
+ GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1030,28 +1037,27 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = nullptr;
}
- grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "glb_shutdown");
while (pp != nullptr) {
pending_pick* next = pp->next;
*pp->target = nullptr;
- GRPC_CLOSURE_SCHED(
- exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pp);
pp = next;
}
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(
- exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+ GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
+ GRPC_ERROR_UNREF(error);
}
// Cancel a specific pending pick.
@@ -1754,8 +1760,8 @@ static void fallback_update_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
- if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 &&
- !glb_policy->fallback_timer_active) {
+ if (glb_policy->lb_fallback_timeout_ms > 0 &&
+ glb_policy->rr_policy != nullptr) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
@@ -1853,7 +1859,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
grpc_call_cancel(glb_policy->lb_call, nullptr);
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
- } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ } else if (glb_policy->started_picking) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
@@ -1870,6 +1876,20 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
}
}
+static void glb_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
+ GPR_ASSERT(!glb_policy->shutting_down);
+ GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
+ if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy,
+ request_reresolution);
+ } else {
+ glb_policy->base.request_reresolution = request_reresolution;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1881,7 +1901,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
- glb_update_locked};
+ glb_update_locked,
+ glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
grpc_lb_policy_factory* factory,
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 6cfc37e9d1..228a77d9db 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -70,8 +70,9 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
}
-static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p,
- grpc_error* error) {
+static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
@@ -96,14 +97,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p,
exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
+ grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace,
+ GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
- shutdown_locked(exec_ctx, (pick_first_lb_policy*)pol,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
-}
-
static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@@ -157,10 +155,15 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
if (p->subchannel_list != nullptr &&
p->subchannel_list->num_subchannels > 0) {
p->subchannel_list->checking_subchannel = 0;
- grpc_lb_subchannel_list_ref_for_connectivity_watch(
- p->subchannel_list, "connectivity_watch+start_picking");
- grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[0]);
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ p->subchannel_list, "connectivity_watch+start_picking");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &p->subchannel_list->subchannels[i]);
+ break;
+ }
+ }
}
}
@@ -404,6 +407,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != nullptr) {
p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
@@ -412,21 +418,35 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
- if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* if the selected channel goes bad, we're done */
- sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ // TODO(juanlishen): we re-resolve when the selected subchannel goes to
+ // TRANSIENT_FAILURE because we used to shut down in this case before
+ // re-resolution is introduced. But we need to investigate whether we
+ // really want to take any action instead of waiting for the selected
+ // subchannel reconnecting.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
+ sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // If the selected channel goes bad, request a re-resolution.
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+ "selected_changed+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(
+ exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+ } else {
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ sd->curr_connectivity_state,
+ GRPC_ERROR_REF(error), "selected_changed");
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- sd->curr_connectivity_state,
- GRPC_ERROR_REF(error), "selected_changed");
if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
} else {
+ p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
- shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "pf_selected_shutdown");
}
}
return;
@@ -531,24 +551,37 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
} while (sd->subchannel == nullptr && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
- exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
- shutdown_locked(exec_ctx, p,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick first exhausted channels", &error, 1));
- break;
- }
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
+ exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels");
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+ "exhausted_subchannels+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(
+ exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+ }
+ } else {
+ if (sd->subchannel_list == p->subchannel_list) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
+ }
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
}
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
- break;
}
}
}
+static void pf_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -559,7 +592,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
- pf_update_locked};
+ pf_update_locked,
+ pf_set_reresolve_closure_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 23a4cc4a5a..f68daba474 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -20,9 +20,9 @@
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
- * respecting the relative
- * order of the addresses provided upon creation or updates. Note however that
- * updates will start picking from the beginning of the updated list. */
+ * respecting the relative order of the addresses provided upon creation or
+ * updates. Note however that updates will start picking from the beginning of
+ * the updated list. */
#include <string.h>
@@ -167,8 +167,9 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
gpr_free(p);
}
-static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
- grpc_error* error) {
+static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
@@ -194,15 +195,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
"sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
+ grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
- round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
- shutdown_locked(exec_ctx, p,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
-}
-
static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@@ -255,10 +252,12 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
- "connectivity_watch");
- grpc_lb_subchannel_data_start_connectivity_watch(
- exec_ctx, &p->subchannel_list->subchannels[i]);
+ if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
+ "connectivity_watch");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &p->subchannel_list->subchannels[i]);
+ }
}
}
@@ -346,71 +345,71 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
- * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
- * connectivity status set. */
-static grpc_connectivity_state update_lb_connectivity_status_locked(
- grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) {
+ * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
+ * only if the policy transitions to state TRANSIENT_FAILURE. */
+static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
+ grpc_lb_subchannel_data* sd,
+ grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
- * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+ * CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
- * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->subchannel_list->num_shutdown ==
- * p->subchannel_list->num_subchannels.
+ * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
+ * re-resolution).
+ * CHECK: subchannel_list->num_shutdown ==
+ * subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
- * TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
+ * TRANSIENT_FAILURE.
+ * CHECK: subchannel_list->num_transient_failures ==
+ * subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
+ * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
+ * (Note that all the subchannels will transition from IDLE to CONNECTING
+ * in batch when we start trying to connect.)
*/
- grpc_connectivity_state new_state = sd->curr_connectivity_state;
+ // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
+ // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
+ // this.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
- if (subchannel_list->num_ready > 0) { /* 1) READY */
+ if (subchannel_list->num_ready > 0) {
+ /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
- new_state = GRPC_CHANNEL_READY;
- } else if (sd->curr_connectivity_state ==
- GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+ /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
- new_state = GRPC_CHANNEL_CONNECTING;
- } else if (p->subchannel_list->num_shutdown ==
- p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
- "rr_shutdown");
- p->shutdown = true;
- new_state = GRPC_CHANNEL_SHUTDOWN;
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[RR %p] Shutting down: all subchannels have gone into shutdown",
- (void*)p);
- }
+ } else if (subchannel_list->num_shutdown ==
+ subchannel_list->num_subchannels) {
+ /* 3) IDLE and re-resolve */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE,
+ "rr_exhausted_subchannels+reresolve");
+ p->started_picking = false;
+ grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+ GRPC_ERROR_NONE);
} else if (subchannel_list->num_transient_failures ==
- p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ subchannel_list->num_subchannels) {
+ /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (subchannel_list->num_idle ==
- p->subchannel_list->num_subchannels) { /* 5) IDLE */
+ } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
+ /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
- new_state = GRPC_CHANNEL_IDLE;
}
GRPC_ERROR_UNREF(error);
- return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
@@ -454,21 +453,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- // Update state counters and determine new overall state.
+ // Update state counters and new overall state.
update_state_counters_locked(sd);
- const grpc_connectivity_state new_policy_connectivity_state =
- update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
- // policy's state is SHUTDOWN, clean up.
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
- if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
- }
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@@ -504,7 +498,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+ * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@@ -642,6 +636,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
}
}
+static void rr_set_reresolve_closure_locked(
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+ grpc_closure* request_reresolution) {
+ round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@@ -652,7 +655,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
- rr_update_locked};
+ rr_update_locked,
+ rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
index 77cc313480..819f66aec3 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
@@ -117,11 +117,35 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
} else {
grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
c->args.interested_parties);
- c->result->transport =
- grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
+ c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args,
+ args->endpoint, true);
GPR_ASSERT(c->result->transport);
+ // TODO(roth): We ideally want to wait until we receive HTTP/2
+ // settings from the server before we consider the connection
+ // established. If that doesn't happen before the connection
+ // timeout expires, then we should consider the connection attempt a
+ // failure and feed that information back into the backoff code.
+ // We could pass a notify_on_receive_settings callback to
+ // grpc_chttp2_transport_start_reading() to let us know when
+ // settings are received, but we would need to figure out how to use
+ // that information here.
+ //
+ // Unfortunately, we don't currently have a way to split apart the two
+ // effects of scheduling c->notify: we start sending RPCs immediately
+ // (which we want to do) and we consider the connection attempt successful
+ // (which we don't want to do until we get the notify_on_receive_settings
+ // callback from the transport). If we could split those things
+ // apart, then we could start sending RPCs but then wait for our
+ // timeout before deciding if the connection attempt is successful.
+ // If the attempt is not successful, then we would tear down the
+ // transport and feed the failure back into the backoff code.
+ //
+ // In addition, even if we did that, we would probably not want to do
+ // so until after transparent retries is implemented. Otherwise, any
+ // RPC that we attempt to send on the connection before the timeout
+ // would fail instead of being retried on a subsequent attempt.
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
- args->read_buffer);
+ args->read_buffer, nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;
@@ -141,8 +165,9 @@ static void start_handshake_locked(grpc_exec_ctx* exec_ctx,
grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
c->args.interested_parties);
grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
- c->args.deadline, nullptr /* acceptor */, on_handshake_done, c);
+ exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint,
+ c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
+ on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index e748d28964..c6b149d0b1 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -53,12 +53,12 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
&exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport* transport =
- grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
+ grpc_create_chttp2_transport(&exec_ctx, final_args, client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(&exec_ctx, final_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index 93be5e4081..49ee677464 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -21,6 +21,7 @@
#include <grpc/grpc.h>
#include <inttypes.h>
+#include <limits.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -31,6 +32,7 @@
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
@@ -53,12 +55,52 @@ typedef struct {
} server_state;
typedef struct {
+ gpr_refcount refs;
server_state* svr_state;
grpc_pollset* accepting_pollset;
grpc_tcp_server_acceptor* acceptor;
grpc_handshake_manager* handshake_mgr;
+ // State for enforcing handshake timeout on receiving HTTP/2 settings.
+ grpc_chttp2_transport* transport;
+ grpc_millis deadline;
+ grpc_timer timer;
+ grpc_closure on_timeout;
+ grpc_closure on_receive_settings;
} server_connection_state;
+static void server_connection_state_unref(
+ grpc_exec_ctx* exec_ctx, server_connection_state* connection_state) {
+ if (gpr_unref(&connection_state->refs)) {
+ if (connection_state->transport != nullptr) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, connection_state->transport,
+ "receive settings timeout");
+ }
+ gpr_free(connection_state);
+ }
+}
+
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ server_connection_state* connection_state = (server_connection_state*)arg;
+ // Note that we may be called with GRPC_ERROR_NONE when the timer fires
+ // or with an error indicating that the timer system is being shut down.
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Did not receive HTTP/2 settings before handshake timeout");
+ grpc_transport_perform_op(exec_ctx, &connection_state->transport->base, op);
+ }
+ server_connection_state_unref(exec_ctx, connection_state);
+}
+
+static void on_receive_settings(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ server_connection_state* connection_state = (server_connection_state*)arg;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_timer_cancel(exec_ctx, &connection_state->timer);
+ }
+ server_connection_state_unref(exec_ctx, connection_state);
+}
+
static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
@@ -68,7 +110,6 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
-
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
@@ -87,14 +128,30 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
if (args->endpoint != nullptr) {
- grpc_transport* transport =
- grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
+ grpc_transport* transport = grpc_create_chttp2_transport(
+ exec_ctx, args->args, args->endpoint, false);
grpc_server_setup_transport(
exec_ctx, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args);
- grpc_chttp2_transport_start_reading(exec_ctx, transport,
- args->read_buffer);
+ // Use notify_on_receive_settings callback to enforce the
+ // handshake deadline.
+ connection_state->transport = (grpc_chttp2_transport*)transport;
+ gpr_ref(&connection_state->refs);
+ GRPC_CLOSURE_INIT(&connection_state->on_receive_settings,
+ on_receive_settings, connection_state,
+ grpc_schedule_on_exec_ctx);
+ grpc_chttp2_transport_start_reading(
+ exec_ctx, transport, args->read_buffer,
+ &connection_state->on_receive_settings);
grpc_channel_args_destroy(exec_ctx, args->args);
+ gpr_ref(&connection_state->refs);
+ GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport,
+ "receive settings timeout");
+ GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout,
+ connection_state, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &connection_state->timer,
+ connection_state->deadline,
+ &connection_state->on_timeout);
}
}
grpc_handshake_manager_pending_list_remove(
@@ -102,9 +159,9 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
connection_state->handshake_mgr);
gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor);
- gpr_free(connection_state);
+ grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
+ server_connection_state_unref(exec_ctx, connection_state);
}
static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
@@ -125,20 +182,25 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
gpr_mu_unlock(&state->mu);
grpc_tcp_server_ref(state->tcp_server);
server_connection_state* connection_state =
- (server_connection_state*)gpr_malloc(sizeof(*connection_state));
+ (server_connection_state*)gpr_zalloc(sizeof(*connection_state));
+ gpr_ref_init(&connection_state->refs, 1);
connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args,
connection_state->handshake_mgr);
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- const grpc_millis deadline =
- grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC;
+ const grpc_arg* timeout_arg =
+ grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+ connection_state->deadline =
+ grpc_exec_ctx_now(exec_ctx) +
+ grpc_channel_arg_get_integer(timeout_arg,
+ {120 * GPR_MS_PER_SEC, 1, INT_MAX});
grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
- tcp, state->args, deadline, acceptor,
- on_handshake_done, connection_state);
+ nullptr /* interested_parties */, tcp,
+ state->args, connection_state->deadline,
+ acceptor, on_handshake_done,
+ connection_state);
}
/* Server callback: start listening on our ports */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index 007d1be50c..3fe05ce4ef 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -50,7 +50,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
grpc_transport* transport = grpc_create_chttp2_transport(
- &exec_ctx, server_args, server_endpoint, 0 /* is_client */);
+ &exec_ctx, server_args, server_endpoint, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
grpc_server_setup_transport(&exec_ctx, server, transport, nullptr,
server_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 5a7830b0c0..63ac65ac78 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -652,6 +652,11 @@ static void close_transport_locked(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
+ if (t->notify_on_receive_settings != nullptr) {
+ GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
+ GRPC_ERROR_CANCELLED);
+ t->notify_on_receive_settings = nullptr;
+ }
GRPC_ERROR_UNREF(error);
}
@@ -1791,7 +1796,6 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
grpc_transport_op* op = (grpc_transport_op*)stream_op;
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)op->handler_private.extra_arg;
- grpc_error* close_transport = op->disconnect_with_error;
if (op->goaway_error) {
send_goaway(exec_ctx, t, op->goaway_error);
@@ -1823,8 +1827,8 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
op->on_connectivity_state_change);
}
- if (close_transport != GRPC_ERROR_NONE) {
- close_transport_locked(exec_ctx, t, close_transport);
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ close_transport_locked(exec_ctx, t, op->disconnect_with_error);
}
GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
@@ -3232,16 +3236,16 @@ static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
grpc_transport* grpc_create_chttp2_transport(
grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
- grpc_endpoint* ep, int is_client) {
+ grpc_endpoint* ep, bool is_client) {
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport));
- init_transport(exec_ctx, t, channel_args, ep, is_client != 0);
+ init_transport(exec_ctx, t, channel_args, ep, is_client);
return &t->base;
}
-void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
- grpc_transport* transport,
- grpc_slice_buffer* read_buffer) {
+void grpc_chttp2_transport_start_reading(
+ grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+ grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport;
GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
@@ -3249,5 +3253,6 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
+ t->notify_on_receive_settings = notify_on_receive_settings;
GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index 369dc34228..bd72e07bab 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -29,12 +29,14 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount;
grpc_transport* grpc_create_chttp2_transport(
grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
- grpc_endpoint* ep, int is_client);
+ grpc_endpoint* ep, bool is_client);
/// Takes ownership of \a read_buffer, which (if non-NULL) contains
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
-void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
- grpc_transport* transport,
- grpc_slice_buffer* read_buffer);
+/// If non-null, \a notify_on_receive_settings will be scheduled when
+/// HTTP/2 settings are received from the peer.
+void grpc_chttp2_transport_start_reading(
+ grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+ grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index 75bb78db4c..de4340fea5 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -131,6 +131,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p,
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
+ if (t->notify_on_receive_settings != nullptr) {
+ GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
+ GRPC_ERROR_NONE);
+ t->notify_on_receive_settings = nullptr;
+ }
}
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9404213e5c..f6fd6795d0 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -241,6 +241,8 @@ struct grpc_chttp2_transport {
grpc_combiner* combiner;
+ grpc_closure* notify_on_receive_settings;
+
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
/** is this the first write in a series of writes?
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 2579060811..d8d753e459 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -458,6 +458,14 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
} else {
err = GRPC_ERROR_REF(error);
}
+ if (s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available != nullptr) {
+ // Set to true unconditionally, because we're failing the call, so even
+ // if we haven't actually seen the send_trailing_metadata op from the
+ // other side, we're going to return trailing metadata anyway.
+ *s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available = true;
+ }
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
@@ -670,6 +678,12 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
nullptr);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
+ if (s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available != nullptr) {
+ *s->recv_initial_md_op->payload->recv_initial_metadata
+ .trailing_metadata_available =
+ (other != nullptr && other->send_trailing_md_op != nullptr);
+ }
grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
@@ -995,6 +1009,15 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
if (error != GRPC_ERROR_NONE) {
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
+ if (op->payload->recv_initial_metadata.trailing_metadata_available !=
+ nullptr) {
+ // Set to true unconditionally, because we're failing the call, so
+ // even if we haven't actually seen the send_trailing_metadata op
+ // from the other side, we're going to return trailing metadata
+ // anyway.
+ *op->payload->recv_initial_metadata.trailing_metadata_available =
+ true;
+ }
INPROC_LOG(
GPR_DEBUG,
"perform_stream_op error %p scheduling initial-metadata-ready %p",