diff options
Diffstat (limited to 'src/core/ext')
15 files changed, 507 insertions, 243 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 58496dc246..aced9adf9f 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -210,6 +210,14 @@ typedef struct client_channel_channel_data { char* info_service_config_json; } channel_data; +typedef struct { + channel_data* chand; + /** used as an identifier, don't dereference it because the LB policy may be + * non-existing when the callback is run */ + grpc_lb_policy* lb_policy; + grpc_closure closure; +} reresolution_request_args; + /** We create one watcher for each new lb_policy that is returned from a resolver, to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher. */ @@ -258,21 +266,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; - grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); } - if (publish_state == GRPC_CHANNEL_SHUTDOWN && - w->chand->resolver != nullptr) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = nullptr; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state, GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); @@ -369,6 +369,27 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) { } } +static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + reresolution_request_args* args = (reresolution_request_args*)arg; + channel_data* chand = args->chand; + // If this invocation is for a stale LB policy, treat it as an LB shutdown + // signal. + if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE || + chand->resolver == nullptr) { + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution"); + gpr_free(args); + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand); + } + grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver); + // Give back the closure to the LB policy. + grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy, + &args->closure); +} + static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { channel_data* chand = (channel_data*)arg; @@ -385,100 +406,114 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, grpc_server_retry_throttle_data* retry_throttle_data = nullptr; grpc_slice_hash_table* method_params_table = nullptr; if (chand->resolver_result != nullptr) { - // Find LB policy name. - const char* lb_policy_name = nullptr; - const grpc_arg* channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - lb_policy_name = channel_arg->value.string; - } - // Special case: If at least one balancer address is present, we use - // the grpclb policy, regardless of what the resolver actually specified. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); - if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses* addresses = - (grpc_lb_addresses*)channel_arg->value.pointer.p; - bool found_balancer_address = false; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) { - found_balancer_address = true; - break; + if (chand->resolver != nullptr) { + // Find LB policy name. + const char* lb_policy_name = nullptr; + const grpc_arg* channel_arg = grpc_channel_args_find( + chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != nullptr) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + lb_policy_name = channel_arg->value.string; + } + // Special case: If at least one balancer address is present, we use + // the grpclb policy, regardless of what the resolver actually specified. + channel_arg = + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); + if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { + grpc_lb_addresses* addresses = + (grpc_lb_addresses*)channel_arg->value.pointer.p; + bool found_balancer_address = false; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) { + found_balancer_address = true; + break; + } + } + if (found_balancer_address) { + if (lb_policy_name != nullptr && + strcmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided at least one " + "balancer address -- forcing use of grpclb LB policy", + lb_policy_name); + } + lb_policy_name = "grpclb"; } } - if (found_balancer_address) { - if (lb_policy_name != nullptr && - strcmp(lb_policy_name, "grpclb") != 0) { - gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided at least one " - "balancer address -- forcing use of grpclb LB policy", + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; + grpc_lb_policy_args lb_policy_args; + lb_policy_args.args = chand->resolver_result; + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.combiner = chand->combiner; + // Check to see if we're already using the right LB policy. + // Note: It's safe to use chand->info_lb_policy_name here without + // taking a lock on chand->info_mu, because this function is the + // only thing that modifies its value, and it can only be invoked + // once at any given time. + lb_policy_name_changed = + chand->info_lb_policy_name == nullptr || + gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; + if (chand->lb_policy != nullptr && !lb_policy_name_changed) { + // Continue using the same LB policy. Update with new addresses. + lb_policy_updated = true; + grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, + &lb_policy_args); + } else { + // Instantiate new LB policy. + new_lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); + if (new_lb_policy == nullptr) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); + } else { + reresolution_request_args* args = + (reresolution_request_args*)gpr_zalloc(sizeof(*args)); + args->chand = chand; + args->lb_policy = new_lb_policy; + GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, + grpc_combiner_scheduler(chand->combiner)); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); + grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy, + &args->closure); } - lb_policy_name = "grpclb"; - } - } - // Use pick_first if nothing was specified and we didn't select grpclb - // above. - if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - grpc_lb_policy_args lb_policy_args; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.combiner = chand->combiner; - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - lb_policy_name_changed = - chand->info_lb_policy_name == nullptr || - gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - lb_policy_updated = true; - grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); - } else { - // Instantiate new LB policy. - new_lb_policy = - grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); - if (new_lb_policy == nullptr) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } - } - // Find service config. - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - service_config_json = gpr_strdup(channel_arg->value.string); - grpc_service_config* service_config = - grpc_service_config_create(service_config_json); - if (service_config != nullptr) { - channel_arg = - grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); - GPR_ASSERT(channel_arg != nullptr); + // Find service config. + channel_arg = grpc_channel_args_find(chand->resolver_result, + GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != nullptr) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri* uri = - grpc_uri_parse(exec_ctx, channel_arg->value.string, true); - GPR_ASSERT(uri->path[0] != '\0'); - service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); - parsing_state.server_name = - uri->path[0] == '/' ? uri->path + 1 : uri->path; - grpc_service_config_parse_global_params( - service_config, parse_retry_throttle_params, &parsing_state); - grpc_uri_destroy(uri); - retry_throttle_data = parsing_state.retry_throttle_data; - method_params_table = grpc_service_config_create_method_config_table( - exec_ctx, service_config, method_parameters_create_from_json, - method_parameters_ref_wrapper, method_parameters_unref_wrapper); - grpc_service_config_destroy(service_config); + service_config_json = gpr_strdup(channel_arg->value.string); + grpc_service_config* service_config = + grpc_service_config_create(service_config_json); + if (service_config != nullptr) { + channel_arg = grpc_channel_args_find(chand->resolver_result, + GRPC_ARG_SERVER_URI); + GPR_ASSERT(channel_arg != nullptr); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, channel_arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); + parsing_state.server_name = + uri->path[0] == '/' ? uri->path + 1 : uri->path; + grpc_service_config_parse_global_params( + service_config, parse_retry_throttle_params, &parsing_state); + grpc_uri_destroy(uri); + retry_throttle_data = parsing_state.retry_throttle_data; + method_params_table = grpc_service_config_create_method_config_table( + exec_ctx, service_config, method_parameters_create_from_json, + method_parameters_ref_wrapper, method_parameters_unref_wrapper); + grpc_service_config_destroy(service_config); + } } + // Before we clean up, save a copy of lb_policy_name, since it might + // be pointing to data inside chand->resolver_result. + // The copy will be saved in chand->lb_policy_name below. + lb_policy_name_dup = gpr_strdup(lb_policy_name); } - // Before we clean up, save a copy of lb_policy_name, since it might - // be pointing to data inside chand->resolver_result. - // The copy will be saved in chand->lb_policy_name below. - lb_policy_name_dup = gpr_strdup(lb_policy_name); grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = nullptr; } @@ -515,11 +550,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, } chand->method_params_table = method_params_table; // If we have a new LB policy or are shutting down (in which case - // new_lb_policy will be NULL), swap out the LB policy, unreffing the - // old one and removing its fds from chand->interested_parties. - // Note that we do NOT do this if either (a) we updated the existing - // LB policy above or (b) we failed to create the new LB policy (in - // which case we want to continue using the most recent one we had). + // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one + // and removing its fds from chand->interested_parties. Note that we do NOT do + // this if either (a) we updated the existing LB policy above or (b) we failed + // to create the new LB policy (in which case we want to continue using the + // most recent one we had). if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { if (chand->lb_policy != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 6276c3e952..db566f1b56 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -161,3 +161,30 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx, const grpc_lb_policy_args* lb_policy_args) { policy->vtable->update_locked(exec_ctx, policy, lb_policy_args); } + +void grpc_lb_policy_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + policy->vtable->set_reresolve_closure_locked(exec_ctx, policy, + request_reresolution); +} + +void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx, + grpc_lb_policy* policy, + grpc_core::TraceFlag* grpc_lb_trace, + grpc_error* error) { + if (policy->request_reresolution != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, policy->request_reresolution, error); + policy->request_reresolution = nullptr; + if (grpc_lb_trace->enabled()) { + gpr_log(GPR_DEBUG, + "%s %p: scheduling re-resolution closure with error=%s.", + grpc_lb_trace->name(), policy, grpc_error_string(error)); + } + } else { + if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) { + gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.", + grpc_lb_trace->name(), policy); + } + } +} diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 72d027995a..d3159eebf3 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -38,6 +38,8 @@ struct grpc_lb_policy { grpc_pollset_set* interested_parties; /* combiner under which lb_policy actions take place */ grpc_combiner* combiner; + /* callback to force a re-resolution */ + grpc_closure* request_reresolution; }; /** Extra arguments for an LB pick */ @@ -96,6 +98,11 @@ struct grpc_lb_policy_vtable { void (*update_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, const grpc_lb_policy_args* args); + + /** \see grpc_lb_policy_set_reresolve_closure */ + void (*set_reresolve_closure_locked)(grpc_exec_ctx* exec_ctx, + grpc_lb_policy* policy, + grpc_closure* request_reresolution); }; #ifndef NDEBUG @@ -202,4 +209,16 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, const grpc_lb_policy_args* lb_policy_args); +/** Set the re-resolution closure to \a request_reresolution. */ +void grpc_lb_policy_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution); + +/** Try to request a re-resolution. It's NOT a public API; it's only for use by + the LB policy implementations. */ +void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx, + grpc_lb_policy* policy, + grpc_core::TraceFlag* grpc_lb_trace, + grpc_error* error); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 2f8e0c93b2..db06fc20b6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -637,7 +637,7 @@ static void update_lb_connectivity_status_locked( /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return * immediately (ignoring its completion callback), we need to perform the - * cleanups this callback would otherwise be resposible for. + * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( @@ -766,6 +766,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, glb_policy->rr_policy); return; } + grpc_lb_policy_set_reresolve_closure_locked( + exec_ctx, new_rr_policy, glb_policy->base.request_reresolution); + glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; const grpc_connectivity_state rr_state = @@ -991,6 +994,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; /* We need a copy of the lb_call pointer because we can't cancell the call @@ -1021,6 +1025,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } else { + grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace, + GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -1030,28 +1037,27 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = nullptr; } - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "glb_shutdown"); while (pp != nullptr) { pending_pick* next = pp->next; *pp->target = nullptr; - GRPC_CLOSURE_SCHED( - exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pp); pp = next; } while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED( - exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } + GRPC_ERROR_UNREF(error); } // Cancel a specific pending pick. @@ -1754,8 +1760,8 @@ static void fallback_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = extract_backend_addresses_locked(exec_ctx, addresses); - if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 && - !glb_policy->fallback_timer_active) { + if (glb_policy->lb_fallback_timeout_ms > 0 && + glb_policy->rr_policy != nullptr) { rr_handover_locked(exec_ctx, glb_policy); } } @@ -1853,7 +1859,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, grpc_call_cancel(glb_policy->lb_call, nullptr); // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. - } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + } else if (glb_policy->started_picking) { if (glb_policy->retry_timer_active) { grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); glb_policy->retry_timer_active = false; @@ -1870,6 +1876,20 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx, } } +static void glb_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + glb_lb_policy* glb_policy = (glb_lb_policy*)policy; + GPR_ASSERT(!glb_policy->shutting_down); + GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); + if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy, + request_reresolution); + } else { + glb_policy->base.request_reresolution = request_reresolution; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1881,7 +1901,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_exit_idle_locked, glb_check_connectivity_locked, glb_notify_on_state_change_locked, - glb_update_locked}; + glb_update_locked, + glb_set_reresolve_closure_locked}; static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 6cfc37e9d1..228a77d9db 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -70,8 +70,9 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } } -static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p, - grpc_error* error) { +static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { + pick_first_lb_policy* p = (pick_first_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } @@ -96,14 +97,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p, exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { - shutdown_locked(exec_ctx, (pick_first_lb_policy*)pol, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); -} - static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -157,10 +155,15 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, if (p->subchannel_list != nullptr && p->subchannel_list->num_subchannels > 0) { p->subchannel_list->checking_subchannel = 0; - grpc_lb_subchannel_list_ref_for_connectivity_watch( - p->subchannel_list, "connectivity_watch+start_picking"); - grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[0]); + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); + break; + } + } } } @@ -404,6 +407,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && p->latest_pending_subchannel_list != nullptr) { p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update"); grpc_lb_subchannel_list_shutdown_and_unref( exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); p->subchannel_list = p->latest_pending_subchannel_list; @@ -412,21 +418,35 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + // TODO(juanlishen): we re-resolve when the selected subchannel goes to + // TRANSIENT_FAILURE because we used to shut down in this case before + // re-resolution is introduced. But we need to investigate whether we + // really want to take any action instead of waiting for the selected + // subchannel reconnecting. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || + sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // If the selected channel goes bad, request a re-resolution. + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + "selected_changed+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + } else { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->curr_connectivity_state, - GRPC_ERROR_REF(error), "selected_changed"); if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } else { + p->selected = nullptr; grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); - shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_selected_shutdown"); } } return; @@ -531,24 +551,37 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } while (sd->subchannel == nullptr && sd != original_sd); if (sd == original_sd) { grpc_lb_subchannel_list_unref_for_connectivity_watch( - exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1)); - break; - } - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); + exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels"); + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, + "exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + } + } else { + if (sd->subchannel_list == p->subchannel_list) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "subchannel_failed"); + } + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); - break; } } } +static void pf_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + pick_first_lb_policy* p = (pick_first_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -559,7 +592,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_exit_idle_locked, pf_check_connectivity_locked, pf_notify_on_state_change_locked, - pf_update_locked}; + pf_update_locked, + pf_set_reresolve_closure_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 23a4cc4a5a..f68daba474 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -20,9 +20,9 @@ * * Before every pick, the \a get_next_ready_subchannel_index_locked function * returns the p->subchannel_list->subchannels index for next subchannel, - * respecting the relative - * order of the addresses provided upon creation or updates. Note however that - * updates will start picking from the beginning of the updated list. */ + * respecting the relative order of the addresses provided upon creation or + * updates. Note however that updates will start picking from the beginning of + * the updated list. */ #include <string.h> @@ -167,8 +167,9 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { gpr_free(p); } -static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p, - grpc_error* error) { +static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { + round_robin_lb_policy* p = (round_robin_lb_policy*)pol; + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); } @@ -194,15 +195,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p, "sl_shutdown_pending_rr_shutdown"); p->latest_pending_subchannel_list = nullptr; } + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { - round_robin_lb_policy* p = (round_robin_lb_policy*)pol; - shutdown_locked(exec_ctx, p, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); -} - static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_connected_subchannel** target, grpc_error* error) { @@ -255,10 +252,12 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, - "connectivity_watch"); - grpc_lb_subchannel_data_start_connectivity_watch( - exec_ctx, &p->subchannel_list->subchannels[i]); + if (p->subchannel_list->subchannels[i].subchannel != nullptr) { + grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); + } } } @@ -346,71 +345,71 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { } /** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the grpc_lb_subchannel_data associted with the updated subchannel) and the - * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be - * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the - * connectivity status set. */ -static grpc_connectivity_state update_lb_connectivity_status_locked( - grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) { + * (the grpc_lb_subchannel_data associated with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used + * only if the policy transitions to state TRANSIENT_FAILURE. */ +static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx, + grpc_lb_subchannel_data* sd, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * * 1) RULE: ANY subchannel is READY => policy is READY. - * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * CHECK: subchannel_list->num_ready > 0. * * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * - * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->subchannel_list->num_shutdown == - * p->subchannel_list->num_subchannels. + * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests + * re-resolution). + * CHECK: subchannel_list->num_shutdown == + * subchannel_list->num_subchannels. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is - * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. + * TRANSIENT_FAILURE. + * CHECK: subchannel_list->num_transient_failures == + * subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->subchannel_list->num_subchannels. + * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels. + * (Note that all the subchannels will transition from IDLE to CONNECTING + * in batch when we start trying to connect.) */ - grpc_connectivity_state new_state = sd->curr_connectivity_state; + // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN, + // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on + // this. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; - if (subchannel_list->num_ready > 0) { /* 1) READY */ + if (subchannel_list->num_ready > 0) { + /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); - new_state = GRPC_CHANNEL_READY; - } else if (sd->curr_connectivity_state == - GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { + /* 2) CONNECTING */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); - new_state = GRPC_CHANNEL_CONNECTING; - } else if (p->subchannel_list->num_shutdown == - p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */ - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), - "rr_shutdown"); - p->shutdown = true; - new_state = GRPC_CHANNEL_SHUTDOWN; - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, - "[RR %p] Shutting down: all subchannels have gone into shutdown", - (void*)p); - } + } else if (subchannel_list->num_shutdown == + subchannel_list->num_subchannels) { + /* 3) IDLE and re-resolve */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, + "rr_exhausted_subchannels+reresolve"); + p->started_picking = false; + grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace, + GRPC_ERROR_NONE); } else if (subchannel_list->num_transient_failures == - p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + subchannel_list->num_subchannels) { + /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); - new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (subchannel_list->num_idle == - p->subchannel_list->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) { + /* 5) IDLE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); - new_state = GRPC_CHANNEL_IDLE; } GRPC_ERROR_UNREF(error); - return new_state; } static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, @@ -454,21 +453,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - // Update state counters and determine new overall state. + // Update state counters and new overall state. update_state_counters_locked(sd); - const grpc_connectivity_state new_policy_connectivity_state = - update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new - // policy's state is SHUTDOWN, clean up. + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the sd's new state is SHUTDOWN, unref the subchannel. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_connectivity_shutdown"); grpc_lb_subchannel_list_unref_for_connectivity_watch( exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); - if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); - } } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { if (sd->connected_subchannel == nullptr) { @@ -504,7 +498,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); grpc_lb_subchannel_data* selected = @@ -642,6 +636,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, } } +static void rr_set_reresolve_closure_locked( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, + grpc_closure* request_reresolution) { + round_robin_lb_policy* p = (round_robin_lb_policy*)policy; + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown_locked, @@ -652,7 +655,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_exit_idle_locked, rr_check_connectivity_locked, rr_notify_on_state_change_locked, - rr_update_locked}; + rr_update_locked, + rr_set_reresolve_closure_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 77cc313480..819f66aec3 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -117,11 +117,35 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, } else { grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint, c->args.interested_parties); - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); + c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args, + args->endpoint, true); GPR_ASSERT(c->result->transport); + // TODO(roth): We ideally want to wait until we receive HTTP/2 + // settings from the server before we consider the connection + // established. If that doesn't happen before the connection + // timeout expires, then we should consider the connection attempt a + // failure and feed that information back into the backoff code. + // We could pass a notify_on_receive_settings callback to + // grpc_chttp2_transport_start_reading() to let us know when + // settings are received, but we would need to figure out how to use + // that information here. + // + // Unfortunately, we don't currently have a way to split apart the two + // effects of scheduling c->notify: we start sending RPCs immediately + // (which we want to do) and we consider the connection attempt successful + // (which we don't want to do until we get the notify_on_receive_settings + // callback from the transport). If we could split those things + // apart, then we could start sending RPCs but then wait for our + // timeout before deciding if the connection attempt is successful. + // If the attempt is not successful, then we would tear down the + // transport and feed the failure back into the backoff code. + // + // In addition, even if we did that, we would probably not want to do + // so until after transparent retries is implemented. Otherwise, any + // RPC that we attempt to send on the connection before the timeout + // would fail instead of being retried on a subsequent attempt. grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - args->read_buffer); + args->read_buffer, nullptr); c->result->channel_args = args->args; } grpc_closure* notify = c->notify; @@ -141,8 +165,9 @@ static void start_handshake_locked(grpc_exec_ctx* exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint, c->args.interested_parties); grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, - c->args.deadline, nullptr /* acceptor */, on_handshake_done, c); + exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint, + c->args.channel_args, c->args.deadline, nullptr /* acceptor */, + on_handshake_done, c); c->endpoint = nullptr; // Endpoint handed off to handshake manager. } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index e748d28964..c6b149d0b1 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -53,12 +53,12 @@ grpc_channel* grpc_insecure_channel_create_from_fd( &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client"); grpc_transport* transport = - grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1); + grpc_create_chttp2_transport(&exec_ctx, final_args, client, true); GPR_ASSERT(transport); grpc_channel* channel = grpc_channel_create( &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); grpc_channel_args_destroy(&exec_ctx, final_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 93be5e4081..49ee677464 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -21,6 +21,7 @@ #include <grpc/grpc.h> #include <inttypes.h> +#include <limits.h> #include <string.h> #include <grpc/support/alloc.h> @@ -31,6 +32,7 @@ #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker_registry.h" @@ -53,12 +55,52 @@ typedef struct { } server_state; typedef struct { + gpr_refcount refs; server_state* svr_state; grpc_pollset* accepting_pollset; grpc_tcp_server_acceptor* acceptor; grpc_handshake_manager* handshake_mgr; + // State for enforcing handshake timeout on receiving HTTP/2 settings. + grpc_chttp2_transport* transport; + grpc_millis deadline; + grpc_timer timer; + grpc_closure on_timeout; + grpc_closure on_receive_settings; } server_connection_state; +static void server_connection_state_unref( + grpc_exec_ctx* exec_ctx, server_connection_state* connection_state) { + if (gpr_unref(&connection_state->refs)) { + if (connection_state->transport != nullptr) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, connection_state->transport, + "receive settings timeout"); + } + gpr_free(connection_state); + } +} + +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + server_connection_state* connection_state = (server_connection_state*)arg; + // Note that we may be called with GRPC_ERROR_NONE when the timer fires + // or with an error indicating that the timer system is being shut down. + if (error != GRPC_ERROR_CANCELLED) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Did not receive HTTP/2 settings before handshake timeout"); + grpc_transport_perform_op(exec_ctx, &connection_state->transport->base, op); + } + server_connection_state_unref(exec_ctx, connection_state); +} + +static void on_receive_settings(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + server_connection_state* connection_state = (server_connection_state*)arg; + if (error == GRPC_ERROR_NONE) { + grpc_timer_cancel(exec_ctx, &connection_state->timer); + } + server_connection_state_unref(exec_ctx, connection_state); +} + static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshaker_args* args = (grpc_handshaker_args*)arg; @@ -68,7 +110,6 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char* error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); - if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { // We were shut down after handshaking completed successfully, so // destroy the endpoint here. @@ -87,14 +128,30 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, // handshaker may have handed off the connection to some external // code, so we can just clean up here without creating a transport. if (args->endpoint != nullptr) { - grpc_transport* transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_transport* transport = grpc_create_chttp2_transport( + exec_ctx, args->args, args->endpoint, false); grpc_server_setup_transport( exec_ctx, connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); - grpc_chttp2_transport_start_reading(exec_ctx, transport, - args->read_buffer); + // Use notify_on_receive_settings callback to enforce the + // handshake deadline. + connection_state->transport = (grpc_chttp2_transport*)transport; + gpr_ref(&connection_state->refs); + GRPC_CLOSURE_INIT(&connection_state->on_receive_settings, + on_receive_settings, connection_state, + grpc_schedule_on_exec_ctx); + grpc_chttp2_transport_start_reading( + exec_ctx, transport, args->read_buffer, + &connection_state->on_receive_settings); grpc_channel_args_destroy(exec_ctx, args->args); + gpr_ref(&connection_state->refs); + GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport, + "receive settings timeout"); + GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout, + connection_state, grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &connection_state->timer, + connection_state->deadline, + &connection_state->on_timeout); } } grpc_handshake_manager_pending_list_remove( @@ -102,9 +159,9 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg, connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->svr_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server); gpr_free(connection_state->acceptor); - gpr_free(connection_state); + grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server); + server_connection_state_unref(exec_ctx, connection_state); } static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, @@ -125,20 +182,25 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state* connection_state = - (server_connection_state*)gpr_malloc(sizeof(*connection_state)); + (server_connection_state*)gpr_zalloc(sizeof(*connection_state)); + gpr_ref_init(&connection_state->refs, 1); connection_state->svr_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args, connection_state->handshake_mgr); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - const grpc_millis deadline = - grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC; + const grpc_arg* timeout_arg = + grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); + connection_state->deadline = + grpc_exec_ctx_now(exec_ctx) + + grpc_channel_arg_get_integer(timeout_arg, + {120 * GPR_MS_PER_SEC, 1, INT_MAX}); grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr, - tcp, state->args, deadline, acceptor, - on_handshake_done, connection_state); + nullptr /* interested_parties */, tcp, + state->args, connection_state->deadline, + acceptor, on_handshake_done, + connection_state); } /* Server callback: start listening on our ports */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 007d1be50c..3fe05ce4ef 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -50,7 +50,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, const grpc_channel_args* server_args = grpc_server_get_channel_args(server); grpc_transport* transport = grpc_create_chttp2_transport( - &exec_ctx, server_args, server_endpoint, 0 /* is_client */); + &exec_ctx, server_args, server_endpoint, false /* is_client */); grpc_pollset** pollsets; size_t num_pollsets = 0; @@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, server_args); - grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5a7830b0c0..63ac65ac78 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -652,6 +652,11 @@ static void close_transport_locked(grpc_exec_ctx* exec_ctx, GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); } + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings, + GRPC_ERROR_CANCELLED); + t->notify_on_receive_settings = nullptr; + } GRPC_ERROR_UNREF(error); } @@ -1791,7 +1796,6 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, grpc_transport_op* op = (grpc_transport_op*)stream_op; grpc_chttp2_transport* t = (grpc_chttp2_transport*)op->handler_private.extra_arg; - grpc_error* close_transport = op->disconnect_with_error; if (op->goaway_error) { send_goaway(exec_ctx, t, op->goaway_error); @@ -1823,8 +1827,8 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, op->on_connectivity_state_change); } - if (close_transport != GRPC_ERROR_NONE) { - close_transport_locked(exec_ctx, t, close_transport); + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + close_transport_locked(exec_ctx, t, op->disconnect_with_error); } GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); @@ -3232,16 +3236,16 @@ static const grpc_transport_vtable* get_vtable(void) { return &vtable; } grpc_transport* grpc_create_chttp2_transport( grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args, - grpc_endpoint* ep, int is_client) { + grpc_endpoint* ep, bool is_client) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport)); - init_transport(exec_ctx, t, channel_args, ep, is_client != 0); + init_transport(exec_ctx, t, channel_args, ep, is_client); return &t->base; } -void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, - grpc_slice_buffer* read_buffer) { +void grpc_chttp2_transport_start_reading( + grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) { grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport; GRPC_CHTTP2_REF_TRANSPORT( t, "reading_action"); /* matches unref inside reading_action */ @@ -3249,5 +3253,6 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } + t->notify_on_receive_settings = notify_on_receive_settings; GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 369dc34228..bd72e07bab 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -29,12 +29,14 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount; grpc_transport* grpc_create_chttp2_transport( grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args, - grpc_endpoint* ep, int is_client); + grpc_endpoint* ep, bool is_client); /// Takes ownership of \a read_buffer, which (if non-NULL) contains /// leftover bytes previously read from the endpoint (e.g., by handshakers). -void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx, - grpc_transport* transport, - grpc_slice_buffer* read_buffer); +/// If non-null, \a notify_on_receive_settings will be scheduled when +/// HTTP/2 settings are received from the peer. +void grpc_chttp2_transport_start_reading( + grpc_exec_ctx* exec_ctx, grpc_transport* transport, + grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 75bb78db4c..de4340fea5 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -131,6 +131,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p, memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); + if (t->notify_on_receive_settings != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings, + GRPC_ERROR_NONE); + t->notify_on_receive_settings = nullptr; + } } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9404213e5c..f6fd6795d0 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -241,6 +241,8 @@ struct grpc_chttp2_transport { grpc_combiner* combiner; + grpc_closure* notify_on_receive_settings; + /** write execution state of the transport */ grpc_chttp2_write_state write_state; /** is this the first write in a series of writes? diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 2579060811..d8d753e459 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -458,6 +458,14 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, } else { err = GRPC_ERROR_REF(error); } + if (s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available != nullptr) { + // Set to true unconditionally, because we're failing the call, so even + // if we haven't actually seen the send_trailing_metadata op from the + // other side, we're going to return trailing metadata anyway. + *s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available = true; + } INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling initial-metadata-ready %p %p", s, error, err); @@ -670,6 +678,12 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, nullptr); s->recv_initial_md_op->payload->recv_initial_metadata .recv_initial_metadata->deadline = s->deadline; + if (s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available != nullptr) { + *s->recv_initial_md_op->payload->recv_initial_metadata + .trailing_metadata_available = + (other != nullptr && other->send_trailing_md_op != nullptr); + } grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, @@ -995,6 +1009,15 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, if (error != GRPC_ERROR_NONE) { // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { + if (op->payload->recv_initial_metadata.trailing_metadata_available != + nullptr) { + // Set to true unconditionally, because we're failing the call, so + // even if we haven't actually seen the send_trailing_metadata op + // from the other side, we're going to return trailing metadata + // anyway. + *op->payload->recv_initial_metadata.trailing_metadata_available = + true; + } INPROC_LOG( GPR_DEBUG, "perform_stream_op error %p scheduling initial-metadata-ready %p", |