aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:30:01 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:30:01 -0800
commit4b064060f741bde1b67e14cb0144dd4772974ef7 (patch)
tree1e0eaf3350b0ab902d8354debb76d32c6d65ba11 /src/core/ext
parent7d1fc82844cb9390a06b4a5a903a93662ae4d793 (diff)
parent71cf7b39dafdeb376370972551cbf78f2750d02e (diff)
Merge github.com:grpc/grpc into hansel
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_channel/client_channel.c120
-rw-r--r--src/core/ext/client_channel/lb_policy_registry.c8
-rw-r--r--src/core/ext/client_channel/subchannel.c138
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c209
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c16
5 files changed, 352 insertions, 139 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index b66fed4b88..1fcff4388a 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -56,7 +56,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
-#include "src/core/lib/transport/method_config.h"
+#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -82,30 +82,61 @@ static void *method_parameters_copy(void *value) {
return new_value;
}
-static int method_parameters_cmp(void *value1, void *value2) {
- const method_parameters *v1 = value1;
- const method_parameters *v2 = value2;
- const int retval = gpr_time_cmp(v1->timeout, v2->timeout);
- if (retval != 0) return retval;
- if (v1->wait_for_ready > v2->wait_for_ready) return 1;
- if (v1->wait_for_ready < v2->wait_for_ready) return -1;
- return 0;
-}
-
static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
- gpr_free, method_parameters_copy, method_parameters_cmp};
-
-static void *method_config_convert_value(
- const grpc_method_config *method_config) {
+ gpr_free, method_parameters_copy};
+
+static void *method_parameters_create_from_json(const grpc_json *json) {
+ wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
+ gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
+ for (grpc_json *field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) continue;
+ if (strcmp(field->key, "waitForReady") == 0) {
+ if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return NULL;
+ }
+ wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
+ : WAIT_FOR_READY_FALSE;
+ } else if (strcmp(field->key, "timeout") == 0) {
+ if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return NULL;
+ char *buf = gpr_strdup(field->value);
+ buf[len - 1] = '\0'; // Remove trailing 's'.
+ char *decimal_point = strchr(buf, '.');
+ if (decimal_point != NULL) {
+ *decimal_point = '\0';
+ timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (timeout.tv_nsec == -1) {
+ gpr_free(buf);
+ return NULL;
+ }
+ // There should always be exactly 3, 6, or 9 fractional digits.
+ int multiplier = 1;
+ switch (strlen(decimal_point + 1)) {
+ case 9:
+ break;
+ case 6:
+ multiplier *= 1000;
+ break;
+ case 3:
+ multiplier *= 1000000;
+ break;
+ default: // Unsupported number of digits.
+ gpr_free(buf);
+ return NULL;
+ }
+ timeout.tv_nsec *= multiplier;
+ }
+ timeout.tv_sec = gpr_parse_nonnegative_int(buf);
+ if (timeout.tv_sec == -1) return NULL;
+ gpr_free(buf);
+ }
+ }
method_parameters *value = gpr_malloc(sizeof(method_parameters));
- const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config);
- value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN);
- const bool *wait_for_ready =
- grpc_method_config_get_wait_for_ready(method_config);
- value->wait_for_ready =
- wait_for_ready == NULL
- ? WAIT_FOR_READY_UNSET
- : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE);
+ value->timeout = timeout;
+ value->wait_for_ready = wait_for_ready;
return value;
}
@@ -126,6 +157,8 @@ typedef struct client_channel_channel_data {
/** currently active load balancer */
char *lb_policy_name;
grpc_lb_policy *lb_policy;
+ /** service config in JSON form */
+ char *service_config_json;
/** maps method names to method_parameters structs */
grpc_mdstr_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -232,15 +265,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
+ char *service_config_json = NULL;
if (chand->resolver_result != NULL) {
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
-
// Find LB policy name.
const grpc_arg *channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
@@ -249,7 +279,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// assume that we should use the grpclb policy, regardless of what the
// resolver actually specified.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
@@ -274,7 +304,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
-
+ // Instantiate LB policy.
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -283,13 +316,20 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
+ // Find service config.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
- method_params_table = grpc_method_config_table_convert(
- (grpc_method_config_table *)channel_arg->value.pointer.p,
- method_config_convert_value, &method_parameters_vtable);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ service_config_json = gpr_strdup(channel_arg->value.string);
+ grpc_service_config *service_config =
+ grpc_service_config_create(service_config_json);
+ if (service_config != NULL) {
+ method_params_table = grpc_service_config_create_method_config_table(
+ service_config, method_parameters_create_from_json,
+ &method_parameters_vtable);
+ grpc_service_config_destroy(service_config);
+ }
}
// Before we clean up, save a copy of lb_policy_name, since it might
// be pointing to data inside chand->resolver_result.
@@ -311,6 +351,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
+ if (service_config_json != NULL) {
+ gpr_free(chand->service_config_json);
+ chand->service_config_json = service_config_json;
+ }
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
@@ -446,6 +490,11 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
? NULL
: gpr_strdup(chand->lb_policy_name);
}
+ if (info->service_config_json != NULL) {
+ *info->service_config_json = chand->service_config_json == NULL
+ ? NULL
+ : gpr_strdup(chand->service_config_json);
+ }
gpr_mu_unlock(&chand->mu);
}
@@ -489,6 +538,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
gpr_free(chand->lb_policy_name);
+ gpr_free(chand->service_config_json);
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
diff --git a/src/core/ext/client_channel/lb_policy_registry.c b/src/core/ext/client_channel/lb_policy_registry.c
index f46a721f9d..90c149d947 100644
--- a/src/core/ext/client_channel/lb_policy_registry.c
+++ b/src/core/ext/client_channel/lb_policy_registry.c
@@ -35,6 +35,8 @@
#include <string.h>
+#include "src/core/lib/support/string.h"
+
#define MAX_POLICIES 10
static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES];
@@ -52,8 +54,8 @@ void grpc_lb_policy_registry_shutdown(void) {
void grpc_register_lb_policy(grpc_lb_policy_factory *factory) {
int i;
for (i = 0; i < g_number_of_lb_policies; i++) {
- GPR_ASSERT(0 != strcmp(factory->vtable->name,
- g_all_of_the_lb_policies[i]->vtable->name));
+ GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name,
+ g_all_of_the_lb_policies[i]->vtable->name));
}
GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES);
grpc_lb_policy_factory_ref(factory);
@@ -66,7 +68,7 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) {
if (name == NULL) return NULL;
for (i = 0; i < g_number_of_lb_policies; i++) {
- if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
+ if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
return g_all_of_the_lb_policies[i];
}
}
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index a148b2a0e1..b83c6882dc 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -119,9 +119,9 @@ struct grpc_subchannel {
gpr_mu mu;
/** have we seen a disconnection? */
- int disconnected;
+ bool disconnected;
/** are we connecting */
- int connecting;
+ bool connecting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
@@ -132,7 +132,9 @@ struct grpc_subchannel {
/** backoff state */
gpr_backoff backoff_state;
/** do we have an active alarm? */
- int have_alarm;
+ bool have_alarm;
+ /** have we started the backoff loop */
+ bool backoff_begun;
/** our alarm */
grpc_timer alarm;
};
@@ -264,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
- c->disconnected = 1;
+ c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@@ -370,7 +372,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -386,12 +389,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
&c->connected);
}
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- c->next_attempt =
- gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
grpc_error **error) {
grpc_connectivity_state state;
@@ -418,6 +415,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_subchannel *c = arg;
+ gpr_mu_lock(&c->mu);
+ c->have_alarm = false;
+ if (c->disconnected) {
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+ c->next_attempt =
+ gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+ continue_connect_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
+ if (c->disconnected) {
+ /* Don't try to connect if we're already disconnected */
+ return;
+ }
+
+ if (c->connecting) {
+ /* Already connecting: don't restart */
+ return;
+ }
+
+ if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+ /* Already connected: don't restart */
+ return;
+ }
+
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ /* Nobody is interested in connecting: so don't just yet */
+ return;
+ }
+
+ c->connecting = true;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ if (!c->backoff_begun) {
+ c->backoff_begun = true;
+ c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+ continue_connect_locked(exec_ctx, c);
+ } else {
+ GPR_ASSERT(!c->have_alarm);
+ c->have_alarm = true;
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
+ grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ }
+}
+
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -449,13 +513,9 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, &w->closure)) {
- c->connecting = 1;
- /* released by connection */
- GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
- start_connect(exec_ctx, c);
- }
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+ state, &w->closure);
+ maybe_start_connecting_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
@@ -575,7 +635,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
Re-evaluate if we really need this. */
gpr_atm_full_barrier();
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
- c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel
ref
@@ -592,28 +651,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
- c->have_alarm = 0;
- if (c->disconnected) {
- error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
- } else {
- GRPC_ERROR_REF(error);
- }
- if (error == GRPC_ERROR_NONE) {
- gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->next_attempt =
- gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
- continue_connect(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- } else {
- gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
- }
- GRPC_ERROR_UNREF(error);
-}
-
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_subchannel *c = arg;
@@ -621,35 +658,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
+ c->connecting = false;
if (c->connecting_result.transport != NULL) {
publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
- 0) {
- gpr_log(GPR_INFO, "Retry immediately");
- } else {
- gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
- time_til_next.tv_sec, time_til_next.tv_nsec);
- }
- grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_error_free_string(errmsg);
+
+ maybe_start_connecting_locked(exec_ctx, c);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
gpr_mu_unlock(&c->mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
grpc_channel_args_destroy(delete_channel_args);
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index d8ef0c8098..4262d2b9a4 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -182,18 +182,24 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
NULL);
if (wc_arg->rr_policy != NULL) {
- /* if target is NULL, no pick has been made by the RR policy (eg, all
+ /* if *target is NULL, no pick has been made by the RR policy (eg, all
* addresses failed to connect). There won't be any user_data/token
* available */
- if (wc_arg->target != NULL) {
- GPR_ASSERT(wc_arg->lb_token != NULL);
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
- wc_arg->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
+ if (*wc_arg->target != NULL) {
+ if (wc_arg->lb_token != NULL) {
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+ } else {
+ gpr_log(GPR_ERROR,
+ "No LB token for connected subchannel pick %p (from RR "
+ "instance %p).",
+ (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
+ abort();
+ }
}
if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
- (intptr_t)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
@@ -411,7 +417,7 @@ static void parse_server(const grpc_grpclb_server *server,
}
/* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses *process_serverlist(
+static grpc_lb_addresses *process_serverlist_locked(
const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
@@ -451,10 +457,12 @@ static grpc_lb_addresses *process_serverlist(
user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
lb_token_mdstr);
} else {
- gpr_log(GPR_ERROR,
+ char *uri = grpc_sockaddr_to_uri(&addr);
+ gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
- grpc_sockaddr_to_uri(&addr));
+ uri);
+ gpr_free(uri);
user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
@@ -467,6 +475,68 @@ static grpc_lb_addresses *process_serverlist(
return lb_addresses;
}
+/* returns true if the new RR policy should replace the current one, if any */
+static bool update_lb_connectivity_status_locked(
+ grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
+ grpc_error *curr_state_error;
+ const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(
+ &glb_policy->state_tracker, &curr_state_error);
+
+ /* The new connectivity status is a function of the previous one and the new
+ * input coming from the status of the RR policy.
+ *
+ * current state (grpclb's)
+ * |
+ * v || I | C | R | TF | SD | <- new state (RR's)
+ * ===++====+=====+=====+======+======+
+ * I || I | C | R | [I] | [I] |
+ * ---++----+-----+-----+------+------+
+ * C || I | C | R | [C] | [C] |
+ * ---++----+-----+-----+------+------+
+ * R || I | C | R | [R] | [R] |
+ * ---++----+-----+-----+------+------+
+ * TF || I | C | R | [TF] | [TF] |
+ * ---++----+-----+-----+------+------+
+ * SD || NA | NA | NA | NA | NA | (*)
+ * ---++----+-----+-----+------+------+
+ *
+ * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
+ * is the current state of grpclb, which is left untouched.
+ *
+ * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
+ * the previous RR instance.
+ *
+ * Note that the status is never updated to SHUTDOWN as a result of calling
+ * this function. Only glb_shutdown() has the power to set that state.
+ *
+ * (*) This function mustn't be called during shutting down. */
+ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
+
+ switch (new_rr_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
+ return false; /* don't replace the RR policy */
+ case GRPC_CHANNEL_INIT:
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_READY:
+ GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
+ }
+
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Setting grpclb's state to %s from new RR policy %p state.",
+ grpc_connectivity_state_name(new_rr_state),
+ (void *)glb_policy->rr_policy);
+ }
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
+ "update_lb_connectivity_status_locked");
+ return true;
+}
+
/* perform a pick over \a rr_policy. Given that a pick can return immediately
* (ignoring its completion callback) we need to perform the cleanups this
* callback would be otherwise resposible for */
@@ -508,7 +578,7 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- grpc_lb_addresses *addresses = process_serverlist(serverlist);
+ grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
@@ -529,49 +599,84 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy, grpc_error *error) {
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
+ if (glb_policy->shutting_down) return;
+
+ grpc_lb_policy *new_rr_policy =
+ create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
+ if (new_rr_policy == NULL) {
+ gpr_log(GPR_ERROR,
+ "Failure creating a RoundRobin policy for serverlist update with "
+ "%lu entries. The previous RR instance (%p), if any, will continue "
+ "to be used. Future updates from the LB will attempt to create new "
+ "instances.",
+ (unsigned long)glb_policy->serverlist->num_servers,
+ (void *)glb_policy->rr_policy);
+ return;
+ }
+
+ grpc_error *new_rr_state_error = NULL;
+ const grpc_connectivity_state new_rr_state =
+ grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy,
+ &new_rr_state_error);
+ /* Connectivity state is a function of the new RR policy just created */
+ const bool replace_old_rr = update_lb_connectivity_status_locked(
+ exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
+
+ if (!replace_old_rr) {
+ /* dispose of the new RR policy that won't be used after all */
+ GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Keeping old RR policy (%p) despite new serverlist: new RR "
+ "policy was in %s connectivity state.",
+ (void *)glb_policy->rr_policy,
+ grpc_connectivity_state_name(new_rr_state));
+ }
+ return;
+ }
+
if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
+ (void *)new_rr_policy, (void *)glb_policy->rr_policy);
}
+
if (glb_policy->rr_policy != NULL) {
/* if we are phasing out an existing RR instance, unref it. */
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
}
- glb_policy->rr_policy =
- create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
- if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
- }
+ /* Finally update the RR policy to the newly created one */
+ glb_policy->rr_policy = new_rr_policy;
- GPR_ASSERT(glb_policy->rr_policy != NULL);
+ /* Add the gRPC LB's interested_parties pollset_set to that of the newly
+ * created RR policy. This will make the RR policy progress upon activity on
+ * gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(exec_ctx,
glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
+ /* Allocate the data for the tracking of the new RR policy's connectivity.
+ * It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = grpc_lb_policy_check_connectivity(
- exec_ctx, glb_policy->rr_policy, &error);
+ rr_connectivity->state = new_rr_state;
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_connectivity->state, GRPC_ERROR_REF(error),
- "rr_handover");
- /* subscribe */
+ /* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
- /* flush pending ops */
+ /* Update picks and pings in wait */
pending_pick *pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
@@ -602,28 +707,36 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- /* If shutdown or error free the arg. Rely on the rest of the code to set the
- * right grpclb status. */
- rr_connectivity_data *rr_conn_data = arg;
- glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
- gpr_mu_lock(&glb_policy->mu);
+ rr_connectivity_data *rr_connectivity = arg;
+ glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
- if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
- !glb_policy->shutting_down) {
- /* RR not shutting down. Mimic the RR's policy state */
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, GRPC_ERROR_REF(error),
- "rr_connectivity_cb");
- /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
+ gpr_mu_lock(&glb_policy->mu);
+ const bool shutting_down = glb_policy->shutting_down;
+ bool unref_needed = false;
+ GRPC_ERROR_REF(error);
+
+ if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
+ /* RR policy shutting down. Don't renew subscription and free the arg of
+ * this callback. In addition we need to stash away the current policy to
+ * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
+ * one, the policy would be destroyed, alongside the lock, which would
+ * result in a use-after-free */
+ unref_needed = true;
+ gpr_free(rr_connectivity);
+ } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
+ update_lb_connectivity_status_locked(exec_ctx, glb_policy,
+ rr_connectivity->state, error);
+ /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
- &rr_conn_data->state,
- &rr_conn_data->on_change);
- } else {
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+ if (unref_needed) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
- gpr_free(rr_conn_data);
}
- gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -768,7 +881,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
grpc_call *lb_call = glb_policy->lb_call;
- glb_policy->lb_call = NULL;
gpr_mu_unlock(&glb_policy->mu);
/* glb_policy->lb_call and this local lb_call must be consistent at this point
@@ -1126,15 +1238,18 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring.");
}
+ grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
- /* and update the copy in the glb_lb_policy instance */
+ /* and update the copy in the glb_lb_policy instance. This serverlist
+ * instance will be destroyed either upon the next update or in
+ * glb_destroy() */
glb_policy->serverlist = serverlist;
- rr_handover_locked(exec_ctx, glb_policy, error);
+ rr_handover_locked(exec_ctx, glb_policy);
}
} else {
if (grpc_lb_glb_trace) {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 7b7a741fbb..3e7c078d3c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -956,6 +956,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
+ md = md->next) {
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key),
+ grpc_mdstr_as_c_string(md->md->value));
+ }
+}
+
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -969,6 +979,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ }
}
grpc_closure *on_complete = op->on_complete;