diff options
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/census/census_log.h | 2 | ||||
-rw-r--r-- | src/core/ext/census/mlog.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 124 | ||||
-rw-r--r-- | src/core/ext/client_channel/lb_policy_registry.c | 8 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 144 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.h | 4 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 240 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 313 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 35 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.c | 3 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 60 |
12 files changed, 651 insertions, 286 deletions
diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h index 534ecc5705..1b185a53b9 100644 --- a/src/core/ext/census/census_log.h +++ b/src/core/ext/census/census_log.h @@ -84,7 +84,7 @@ const void *census_log_read_next(size_t *bytes_available); */ size_t census_log_remaining_space(void); -/* Returns the number of times gprc_stats_log_start_write() failed due to +/* Returns the number of times grpc_stats_log_start_write() failed due to out-of-space. */ int census_log_out_of_space_count(void); diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h index a256426f91..18805ad994 100644 --- a/src/core/ext/census/mlog.h +++ b/src/core/ext/census/mlog.h @@ -88,7 +88,7 @@ const void* census_log_read_next(size_t* bytes_available); */ size_t census_log_remaining_space(void); -/* Returns the number of times gprc_stats_log_start_write() failed due to +/* Returns the number of times grpc_stats_log_start_write() failed due to out-of-space. */ int64_t census_log_out_of_space_count(void); diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 647e5becc4..17ffc0f455 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -56,7 +56,7 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/method_config.h" +#include "src/core/lib/transport/service_config.h" #include "src/core/lib/transport/static_metadata.h" /* Client channel implementation */ @@ -82,30 +82,61 @@ static void *method_parameters_copy(void *value) { return new_value; } -static int method_parameters_cmp(void *value1, void *value2) { - const method_parameters *v1 = value1; - const method_parameters *v2 = value2; - const int retval = gpr_time_cmp(v1->timeout, v2->timeout); - if (retval != 0) return retval; - if (v1->wait_for_ready > v2->wait_for_ready) return 1; - if (v1->wait_for_ready < v2->wait_for_ready) return -1; - return 0; -} - static const grpc_mdstr_hash_table_vtable method_parameters_vtable = { - gpr_free, method_parameters_copy, method_parameters_cmp}; - -static void *method_config_convert_value( - const grpc_method_config *method_config) { + gpr_free, method_parameters_copy}; + +static void *method_parameters_create_from_json(const grpc_json *json) { + wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; + gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; + for (grpc_json *field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) continue; + if (strcmp(field->key, "waitForReady") == 0) { + if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return NULL; + } + wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE + : WAIT_FOR_READY_FALSE; + } else if (strcmp(field->key, "timeout") == 0) { + if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return NULL; + char *buf = gpr_strdup(field->value); + buf[len - 1] = '\0'; // Remove trailing 's'. + char *decimal_point = strchr(buf, '.'); + if (decimal_point != NULL) { + *decimal_point = '\0'; + timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); + if (timeout.tv_nsec == -1) { + gpr_free(buf); + return NULL; + } + // There should always be exactly 3, 6, or 9 fractional digits. + int multiplier = 1; + switch (strlen(decimal_point + 1)) { + case 9: + break; + case 6: + multiplier *= 1000; + break; + case 3: + multiplier *= 1000000; + break; + default: // Unsupported number of digits. + gpr_free(buf); + return NULL; + } + timeout.tv_nsec *= multiplier; + } + timeout.tv_sec = gpr_parse_nonnegative_int(buf); + if (timeout.tv_sec == -1) return NULL; + gpr_free(buf); + } + } method_parameters *value = gpr_malloc(sizeof(method_parameters)); - const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config); - value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN); - const bool *wait_for_ready = - grpc_method_config_get_wait_for_ready(method_config); - value->wait_for_ready = - wait_for_ready == NULL - ? WAIT_FOR_READY_UNSET - : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE); + value->timeout = timeout; + value->wait_for_ready = wait_for_ready; return value; } @@ -126,6 +157,8 @@ typedef struct client_channel_channel_data { /** currently active load balancer */ char *lb_policy_name; grpc_lb_policy *lb_policy; + /** service config in JSON form */ + char *service_config_json; /** maps method names to method_parameters structs */ grpc_mdstr_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -232,15 +265,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); + char *service_config_json = NULL; if (chand->resolver_result != NULL) { - grpc_lb_policy_args lb_policy_args; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - // Find LB policy name. const grpc_arg *channel_arg = - grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME); + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); lb_policy_name = channel_arg->value.string; @@ -249,7 +279,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, // assume that we should use the grpclb policy, regardless of what the // resolver actually specified. channel_arg = - grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES); + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); grpc_lb_addresses *addresses = channel_arg->value.pointer.p; @@ -274,7 +304,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, // Use pick_first if nothing was specified and we didn't select grpclb // above. if (lb_policy_name == NULL) lb_policy_name = "pick_first"; - + // Instantiate LB policy. + grpc_lb_policy_args lb_policy_args; + lb_policy_args.args = chand->resolver_result; + lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { @@ -283,13 +316,20 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } + // Find service config. channel_arg = - grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG); + grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); - method_params_table = grpc_method_config_table_convert( - (grpc_method_config_table *)channel_arg->value.pointer.p, - method_config_convert_value, &method_parameters_vtable); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + service_config_json = gpr_strdup(channel_arg->value.string); + grpc_service_config *service_config = + grpc_service_config_create(service_config_json); + if (service_config != NULL) { + method_params_table = grpc_service_config_create_method_config_table( + service_config, method_parameters_create_from_json, + &method_parameters_vtable); + grpc_service_config_destroy(service_config); + } } // Before we clean up, save a copy of lb_policy_name, since it might // be pointing to data inside chand->resolver_result. @@ -311,6 +351,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; + if (service_config_json != NULL) { + gpr_free(chand->service_config_json); + chand->service_config_json = service_config_json; + } if (chand->method_params_table != NULL) { grpc_mdstr_hash_table_unref(chand->method_params_table); } @@ -446,6 +490,11 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, ? NULL : gpr_strdup(chand->lb_policy_name); } + if (info->service_config_json != NULL) { + *info->service_config_json = chand->service_config_json == NULL + ? NULL + : gpr_strdup(chand->service_config_json); + } gpr_mu_unlock(&chand->mu); } @@ -489,6 +538,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } gpr_free(chand->lb_policy_name); + gpr_free(chand->service_config_json); if (chand->method_params_table != NULL) { grpc_mdstr_hash_table_unref(chand->method_params_table); } @@ -641,7 +691,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -894,7 +944,7 @@ retry: grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); diff --git a/src/core/ext/client_channel/lb_policy_registry.c b/src/core/ext/client_channel/lb_policy_registry.c index f46a721f9d..90c149d947 100644 --- a/src/core/ext/client_channel/lb_policy_registry.c +++ b/src/core/ext/client_channel/lb_policy_registry.c @@ -35,6 +35,8 @@ #include <string.h> +#include "src/core/lib/support/string.h" + #define MAX_POLICIES 10 static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; @@ -52,8 +54,8 @@ void grpc_lb_policy_registry_shutdown(void) { void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { int i; for (i = 0; i < g_number_of_lb_policies; i++) { - GPR_ASSERT(0 != strcmp(factory->vtable->name, - g_all_of_the_lb_policies[i]->vtable->name)); + GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name, + g_all_of_the_lb_policies[i]->vtable->name)); } GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); grpc_lb_policy_factory_ref(factory); @@ -66,7 +68,7 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) { if (name == NULL) return NULL; for (i = 0; i < g_number_of_lb_policies; i++) { - if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { + if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { return g_all_of_the_lb_policies[i]; } } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index a5d7c0df45..b83c6882dc 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -119,9 +119,9 @@ struct grpc_subchannel { gpr_mu mu; /** have we seen a disconnection? */ - int disconnected; + bool disconnected; /** are we connecting */ - int connecting; + bool connecting; /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; @@ -132,7 +132,9 @@ struct grpc_subchannel { /** backoff state */ gpr_backoff backoff_state; /** do we have an active alarm? */ - int have_alarm; + bool have_alarm; + /** have we started the backoff loop */ + bool backoff_begun; /** our alarm */ grpc_timer alarm; }; @@ -264,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); - c->disconnected = 1; + c->disconnected = true; grpc_connector_shutdown(exec_ctx, c->connector); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -370,7 +372,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return grpc_subchannel_index_register(exec_ctx, key, c); } -static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void continue_connect_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -386,12 +389,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { &c->connected); } -static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->next_attempt = - gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); -} - grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; @@ -418,6 +415,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, follow_up->cb(exec_ctx, follow_up->cb_arg, error); } +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_subchannel *c = arg; + gpr_mu_lock(&c->mu); + c->have_alarm = false; + if (c->disconnected) { + error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); + } else { + GRPC_ERROR_REF(error); + } + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); + continue_connect_locked(exec_ctx, c); + gpr_mu_unlock(&c->mu); + } else { + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + } + GRPC_ERROR_UNREF(error); +} + +static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { + if (c->disconnected) { + /* Don't try to connect if we're already disconnected */ + return; + } + + if (c->connecting) { + /* Already connecting: don't restart */ + return; + } + + if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) { + /* Already connected: don't restart */ + return; + } + + if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) { + /* Nobody is interested in connecting: so don't just yet */ + return; + } + + c->connecting = true; + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + if (!c->backoff_begun) { + c->backoff_begun = true; + c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); + continue_connect_locked(exec_ctx, c); + } else { + GPR_ASSERT(!c->have_alarm); + c->have_alarm = true; + gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= + 0) { + gpr_log(GPR_INFO, "Retry immediately"); + } else { + gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", + time_til_next.tv_sec, time_til_next.tv_nsec); + } + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + } +} + void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, @@ -449,13 +513,9 @@ void grpc_subchannel_notify_on_state_change( w->next = &c->root_external_state_watcher; w->prev = w->next->prev; w->next->prev = w->prev->next = w; - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - start_connect(exec_ctx, c); - } + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, + state, &w->closure); + maybe_start_connecting_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } } @@ -575,7 +635,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, Re-evaluate if we really need this. */ gpr_atm_full_barrier(); GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); - c->connecting = 0; /* setup subchannel watching connected subchannel for changes; subchannel ref @@ -592,28 +651,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, "connected"); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_subchannel *c = arg; - gpr_mu_lock(&c->mu); - c->have_alarm = 0; - if (c->disconnected) { - error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1); - } else { - GRPC_ERROR_REF(error); - } - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->next_attempt = - gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); - continue_connect(exec_ctx, c); - gpr_mu_unlock(&c->mu); - } else { - gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); - } - GRPC_ERROR_UNREF(error); -} - static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; @@ -621,35 +658,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); + c->connecting = false; if (c->connecting_result.transport != NULL) { publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); - gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); + const char *errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= - 0) { - gpr_log(GPR_INFO, "Retry immediately"); - } else { - gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", - time_til_next.tv_sec, time_til_next.tv_nsec); - } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); grpc_error_free_string(errmsg); + + maybe_start_connecting_locked(exec_ctx, c); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); grpc_channel_args_destroy(delete_channel_args); } @@ -702,15 +732,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **call) { + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, deadline, callstk); + NULL, NULL, path, start_time, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 93bd72d20d..10bae620df 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -111,8 +111,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 41e177c495..4262d2b9a4 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -182,17 +182,24 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, NULL); if (wc_arg->rr_policy != NULL) { - /* if target is NULL, no pick has been made by the RR policy (eg, all + /* if *target is NULL, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ - if (wc_arg->target != NULL) { - initial_metadata_add_lb_token(wc_arg->initial_metadata, - wc_arg->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); + if (*wc_arg->target != NULL) { + if (wc_arg->lb_token != NULL) { + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + } else { + gpr_log(GPR_ERROR, + "No LB token for connected subchannel pick %p (from RR " + "instance %p).", + (void *)*wc_arg->target, (void *)wc_arg->rr_policy); + abort(); + } } if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); + gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } @@ -410,7 +417,7 @@ static void parse_server(const grpc_grpclb_server *server, } /* Returns addresses extracted from \a serverlist. */ -static grpc_lb_addresses *process_serverlist( +static grpc_lb_addresses *process_serverlist_locked( const grpc_grpclb_serverlist *serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary @@ -450,10 +457,12 @@ static grpc_lb_addresses *process_serverlist( user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { - gpr_log(GPR_ERROR, + char *uri = grpc_sockaddr_to_uri(&addr); + gpr_log(GPR_INFO, "Missing LB token for backend address '%s'. The empty token will " "be used instead", - grpc_sockaddr_to_uri(&addr)); + uri); + gpr_free(uri); user_data = GRPC_MDELEM_LB_TOKEN_EMPTY; } @@ -466,6 +475,68 @@ static grpc_lb_addresses *process_serverlist( return lb_addresses; } +/* returns true if the new RR policy should replace the current one, if any */ +static bool update_lb_connectivity_status_locked( + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_error *curr_state_error; + const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( + &glb_policy->state_tracker, &curr_state_error); + + /* The new connectivity status is a function of the previous one and the new + * input coming from the status of the RR policy. + * + * current state (grpclb's) + * | + * v || I | C | R | TF | SD | <- new state (RR's) + * ===++====+=====+=====+======+======+ + * I || I | C | R | [I] | [I] | + * ---++----+-----+-----+------+------+ + * C || I | C | R | [C] | [C] | + * ---++----+-----+-----+------+------+ + * R || I | C | R | [R] | [R] | + * ---++----+-----+-----+------+------+ + * TF || I | C | R | [TF] | [TF] | + * ---++----+-----+-----+------+------+ + * SD || NA | NA | NA | NA | NA | (*) + * ---++----+-----+-----+------+------+ + * + * A [STATE] indicates that the old RR policy is kept. In those cases, STATE + * is the current state of grpclb, which is left untouched. + * + * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to + * the previous RR instance. + * + * Note that the status is never updated to SHUTDOWN as a result of calling + * this function. Only glb_shutdown() has the power to set that state. + * + * (*) This function mustn't be called during shutting down. */ + GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + + switch (new_rr_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: + GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); + return false; /* don't replace the RR policy */ + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + } + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(new_rr_state), + (void *)glb_policy->rr_policy); + } + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + "update_lb_connectivity_status_locked"); + return true; +} + /* perform a pick over \a rr_policy. Given that a pick can return immediately * (ignoring its completion callback) we need to perform the cleanups this * callback would be otherwise resposible for */ @@ -507,7 +578,7 @@ static grpc_lb_policy *create_rr_locked( grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; - grpc_lb_addresses *addresses = process_serverlist(serverlist); + grpc_lb_addresses *addresses = process_serverlist_locked(serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. @@ -528,49 +599,84 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy, grpc_error *error) { + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); + if (glb_policy->shutting_down) return; + + grpc_lb_policy *new_rr_policy = + create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + if (new_rr_policy == NULL) { + gpr_log(GPR_ERROR, + "Failure creating a RoundRobin policy for serverlist update with " + "%lu entries. The previous RR instance (%p), if any, will continue " + "to be used. Future updates from the LB will attempt to create new " + "instances.", + (unsigned long)glb_policy->serverlist->num_servers, + (void *)glb_policy->rr_policy); + return; + } + + grpc_error *new_rr_state_error = NULL; + const grpc_connectivity_state new_rr_state = + grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy, + &new_rr_state_error); + /* Connectivity state is a function of the new RR policy just created */ + const bool replace_old_rr = update_lb_connectivity_status_locked( + exec_ctx, glb_policy, new_rr_state, new_rr_state_error); + + if (!replace_old_rr) { + /* dispose of the new RR policy that won't be used after all */ + GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Keeping old RR policy (%p) despite new serverlist: new RR " + "policy was in %s connectivity state.", + (void *)glb_policy->rr_policy, + grpc_connectivity_state_name(new_rr_state)); + } + return; + } + if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", + (void *)new_rr_policy, (void *)glb_policy->rr_policy); } + if (glb_policy->rr_policy != NULL) { /* if we are phasing out an existing RR instance, unref it. */ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); } - glb_policy->rr_policy = - create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); - } + /* Finally update the RR policy to the newly created one */ + glb_policy->rr_policy = new_rr_policy; - GPR_ASSERT(glb_policy->rr_policy != NULL); + /* Add the gRPC LB's interested_parties pollset_set to that of the newly + * created RR policy. This will make the RR policy progress upon activity on + * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); + /* Allocate the data for the tracking of the new RR policy's connectivity. + * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = grpc_lb_policy_check_connectivity( - exec_ctx, glb_policy->rr_policy, &error); + rr_connectivity->state = new_rr_state; - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_connectivity->state, GRPC_ERROR_REF(error), - "rr_handover"); - /* subscribe */ + /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); - /* flush pending ops */ + /* Update picks and pings in wait */ pending_pick *pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; @@ -601,28 +707,36 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - /* If shutdown or error free the arg. Rely on the rest of the code to set the - * right grpclb status. */ - rr_connectivity_data *rr_conn_data = arg; - glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - - if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && - !glb_policy->shutting_down) { - gpr_mu_lock(&glb_policy->mu); - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "rr_connectivity_cb"); - /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ + rr_connectivity_data *rr_connectivity = arg; + glb_lb_policy *glb_policy = rr_connectivity->glb_policy; + + gpr_mu_lock(&glb_policy->mu); + const bool shutting_down = glb_policy->shutting_down; + bool unref_needed = false; + GRPC_ERROR_REF(error); + + if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) { + /* RR policy shutting down. Don't renew subscription and free the arg of + * this callback. In addition we need to stash away the current policy to + * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last + * one, the policy would be destroyed, alongside the lock, which would + * result in a use-after-free */ + unref_needed = true; + gpr_free(rr_connectivity); + } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, + rr_connectivity->state, error); + /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); - } else { + &rr_connectivity->state, + &rr_connectivity->on_change); + } + gpr_mu_unlock(&glb_policy->mu); + if (unref_needed) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); - gpr_free(rr_conn_data); } + GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -760,17 +874,23 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->rr_policy) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - if (glb_policy->started_picking) { - if (glb_policy->lb_call != NULL) { - grpc_call_cancel(glb_policy->lb_call, NULL); - /* lb_on_server_status_received will pick up the cancel and clean up */ - } - } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); + /* We need a copy of the lb_call pointer because we can't cancell the call + * while holding glb_policy->mu: lb_on_server_status_received, invoked due to + * the cancel, needs to acquire that same lock */ + grpc_call *lb_call = glb_policy->lb_call; gpr_mu_unlock(&glb_policy->mu); + /* glb_policy->lb_call and this local lb_call must be consistent at this point + * because glb_policy->lb_call is only assigned in lb_call_init_locked as part + * of query_for_backends_locked, which can only be invoked while + * glb_policy->shutting_down is false. */ + if (lb_call != NULL) { + grpc_call_cancel(lb_call, NULL); + /* lb_on_server_status_received will pick up the cancel and clean up */ + } while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -954,9 +1074,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_call_init(glb_lb_policy *glb_policy) { +static void lb_call_init_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling @@ -1009,7 +1130,9 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - lb_call_init(glb_policy); + if (glb_policy->shutting_down) return; + + lb_call_init_locked(glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", @@ -1081,6 +1204,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside @@ -1109,23 +1233,24 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&glb_policy->mu); if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Incoming server list identical to current, ignoring."); } + grpc_grpclb_destroy_serverlist(serverlist); } else { /* new serverlist */ if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - /* and update the copy in the glb_lb_policy instance */ + /* and update the copy in the glb_lb_policy instance. This serverlist + * instance will be destroyed either upon the next update or in + * glb_destroy() */ glb_policy->serverlist = serverlist; - rr_handover_locked(exec_ctx, glb_policy, error); + rr_handover_locked(exec_ctx, glb_policy); } - gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1153,9 +1278,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } + gpr_mu_unlock(&glb_policy->mu); } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ + gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_response_received_empty_payload"); } @@ -1175,7 +1302,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, query_for_backends_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_on_retry_timer"); } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index ac3c6a305a..c69f773e78 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -292,6 +292,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } else { loop: switch (p->checking_connectivity) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 0fd3abe099..59f84054c4 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -116,8 +116,13 @@ typedef struct { grpc_closure connectivity_changed_closure; /** this subchannels current position in subchannel->ready_list */ ready_list *ready_list_node; - /** last observed connectivity */ - grpc_connectivity_state connectivity_state; + /** last observed connectivity. Not updated by + * \a grpc_subchannel_notify_on_state_change. Used to determine the previous + * state while processing the new state in \a rr_connectivity_changed */ + grpc_connectivity_state prev_connectivity_state; + /** current connectivity state. Updated by \a + * grpc_subchannel_notify_on_state_change */ + grpc_connectivity_state curr_connectivity_state; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -127,6 +132,7 @@ typedef struct { struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + gpr_mu mu; /** total number of addresses received at creation time */ size_t num_addresses; @@ -135,8 +141,11 @@ struct round_robin_lb_policy { size_t num_subchannels; subchannel_data **subchannels; - /** mutex protecting remaining members */ - gpr_mu mu; + /** how many subchannels are in TRANSIENT_FAILURE */ + size_t num_transient_failures; + /** how many subchannels are IDLE */ + size_t num_idle; + /** have we started picking? */ int started_picking; /** are we shutting down? */ @@ -258,6 +267,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } +static bool is_ready_list_empty(round_robin_lb_policy *p) { + return p->ready_list.prev == NULL; +} + static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; @@ -268,7 +281,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(sd->user_data); @@ -381,18 +394,18 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, - p->num_subchannels); - } - for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - sd->connectivity_state = GRPC_CHANNEL_IDLE; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); } } @@ -422,7 +435,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); if (user_data != NULL) { *user_data = selected->user_data; @@ -453,125 +466,184 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } +static void update_state_counters(subchannel_data *sd) { + round_robin_lb_policy *p = sd->policy; + + /* update p->num_transient_failures (resp. p->num_idle): if the previous + * state was TRANSIENT_FAILURE (resp. IDLE), decrement + * p->num_transient_failures (resp. p->num_idle). */ + if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(p->num_transient_failures > 0); + --p->num_transient_failures; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { + GPR_ASSERT(p->num_idle > 0); + --p->num_idle; + } +} + +/* sd is the subchannel_data associted with the updated subchannel. + * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE + * or SHUTDOWN */ +static grpc_connectivity_state update_lb_connectivity_status( + grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { + /* In priority order. The first rule to match terminates the search (ie, if we + * are on rule n, all previous rules were unfulfilled). + * + * 1) RULE: ANY subchannel is READY => policy is READY. + * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * + * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. + * CHECK: sd->curr_connectivity_state == CONNECTING. + * + * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. + * CHECK: p->num_subchannels = 0. + * + * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: p->num_transient_failures == p->num_subchannels. + * + * 5) RULE: ALL subchannels are IDLE => policy is IDLE. + * CHECK: p->num_idle == p->num_subchannels. + */ + round_robin_lb_policy *p = sd->policy; + if (!is_ready_list_empty(p)) { /* 1) READY */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "rr_ready"); + return GRPC_CHANNEL_READY; + } else if (sd->curr_connectivity_state == + GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "rr_connecting"); + return GRPC_CHANNEL_CONNECTING; + } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "rr_shutdown"); + return GRPC_CHANNEL_SHUTDOWN; + } else if (p->num_transient_failures == + p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "rr_transient_failure"); + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, "rr_idle"); + return GRPC_CHANNEL_IDLE; + } + /* no change */ + return sd->curr_connectivity_state; +} + static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - int unref = 0; - GRPC_ERROR_REF(error); gpr_mu_lock(&p->mu); if (p->shutdown) { - unref = 1; - } else { - switch (sd->connectivity_state) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_REF(error), - "connecting_ready"); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + GRPC_ERROR_UNREF(error); + return; + } + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); + case GRPC_CHANNEL_READY: + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + sd->ready_list_node = add_connected_sc_locked(p, sd); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "rr_picked"); + if (pp->user_data != NULL) { + *pp->user_data = selected->user_data; } - + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + (void *)selected->subchannel, (void *)selected); + } + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + gpr_free(pp); + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_IDLE: + ++p->num_idle; + /* fallthrough */ + case GRPC_CHANNEL_CONNECTING: + update_state_counters(sd); + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + ++p->num_transient_failures; + /* remove from ready list if still present */ + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_SHUTDOWN: + update_state_counters(sd); + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + --p->num_subchannels; + GPR_SWAP(subchannel_data *, p->subchannels[sd->index], + p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + p->subchannels[sd->index]->index = sd->index; + if (update_lb_connectivity_status(exec_ctx, sd, error) == + GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); - if (pp->user_data != NULL) { - *pp->user_data = selected->user_data; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); - } + *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, sd->connectivity_state, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); - break; - case GRPC_CHANNEL_SHUTDOWN: - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - - p->num_subchannels--; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); - p->subchannels[sd->index]->index = sd->index; - gpr_free(sd); - - unref = 1; - if (p->num_subchannels == 0) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted", - &error, 1), - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); - gpr_free(pp); - } - } else { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); + } + gpr_free(sd); + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + break; } - + gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } @@ -607,9 +679,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, @@ -705,6 +777,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", + (void *)p, (unsigned long)p->num_subchannels); + } gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 1ffa9165b2..4e3c7ff681 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -954,6 +954,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, + bool is_client, bool is_initial) { + for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail; + md = md->next) { + gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL", + is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key), + grpc_mdstr_as_c_string(md->md->value)); + } +} + static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); @@ -967,6 +977,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); gpr_free(str); + if (op->send_initial_metadata) { + log_metadata(op->send_initial_metadata, s->id, t->is_client, true); + } + if (op->send_trailing_metadata) { + log_metadata(op->send_trailing_metadata, s->id, t->is_client, false); + } } grpc_closure *on_complete = op->on_complete; @@ -1037,7 +1053,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "op.send_initial_metadata"); } } else { - s->send_trailing_metadata = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE( @@ -1523,13 +1539,17 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_error *error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); - s->fetching_send_message = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), "send_initial_metadata_finished"); + + s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + + s->fetching_send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -2294,6 +2314,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); } +/******************************************************************************* + * MONITORING + */ +static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *t) { + return ((grpc_chttp2_transport *)t)->ep; +} + static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, @@ -2303,7 +2331,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), perform_transport_op, destroy_stream, destroy_transport, - chttp2_get_peer}; + chttp2_get_peer, + chttp2_get_endpoint}; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index b9c405158f..5efb49751c 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -471,7 +471,8 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdstr_as_c_string(md->value)); *cached_timeout = gpr_inf_future(GPR_TIMESPAN); } - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + cached_timeout = + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_chttp2_incoming_metadata_buffer_set_deadline( &s->metadata_buffer[0], diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 1e5377c2b4..a4c110101e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -42,6 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" @@ -610,6 +611,16 @@ static int parse_grpc_header(const uint8_t *data) { return length; } +static bool header_has_authority(grpc_linked_mdelem *head) { + while (head != NULL) { + if (head->md->key == GRPC_MDSTR_AUTHORITY) { + return true; + } + head = head->next; + } + return false; +} + /* Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. @@ -981,11 +992,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->on_complete && op_can_be_run(stream_op, stream_state, &oas->state, OP_ON_COMPLETE)) { - /* All actions in this stream_op are complete. Call the on_complete callback - */ CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, - NULL); + if (stream_state->state_op_done[OP_CANCEL_ERROR] || + stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, + GRPC_ERROR_CANCELLED, NULL); + } else { + /* All actions in this stream_op are complete. Call the on_complete + * callback + */ + grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, + NULL); + } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; /* reset any send message state, only if this ON_COMPLETE is about a send. @@ -1042,7 +1060,31 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->curr_gs = gs; memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); add_to_storage(s, op); - execute_from_storage(s); + if (op->send_initial_metadata && + header_has_authority(op->send_initial_metadata->list.head)) { + /* Cronet does not support :authority header field. We cancel the call when + this field is present in metadata */ + cronet_bidirectional_stream_header_array header_array; + cronet_bidirectional_stream_header *header; + cronet_bidirectional_stream cbs; + CRONET_LOG(GPR_DEBUG, + ":authority header is provided but not supported;" + " cancel operations"); + /* Notify application that operation is cancelled by forging trailers */ + header_array.count = 1; + header_array.capacity = 1; + header_array.headers = + gpr_malloc(sizeof(cronet_bidirectional_stream_header)); + header = (cronet_bidirectional_stream_header *)header_array.headers; + header->key = "grpc-status"; + header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ + cbs.annotation = (void *)s; + s->state.state_op_done[OP_CANCEL_ERROR] = true; + on_response_trailers_received(&cbs, &header_array); + gpr_free(header_array.headers); + } else { + execute_from_storage(s); + } } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, @@ -1054,6 +1096,11 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { return NULL; } +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *gt) { + return NULL; +} + static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} @@ -1066,4 +1113,5 @@ const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), perform_op, destroy_stream, destroy_transport, - get_peer}; + get_peer, + get_endpoint}; |