aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-10-18 10:49:25 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-10-18 10:49:25 -0700
commita60226726ae6076916db0e1d616338a1a211770d (patch)
tree4bf291f23be252799481e9e28f130f36a24e94b3
parent03dbb8c1e2ee6261e1b8802bb7dd0be9a0b8bd46 (diff)
reviewer feedback
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc122
-rw-r--r--test/core/end2end/tests/channelz.cc2
-rw-r--r--test/core/end2end/tests/retry_streaming.cc55
3 files changed, 107 insertions, 72 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 95d1f41094..64e206ec63 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -131,8 +131,8 @@ typedef struct client_channel_channel_data {
grpc_core::UniquePtr<char> info_service_config_json;
/* backpointer to grpc_channel's channelz node */
grpc_core::channelz::ClientChannelNode* channelz_channel;
- /* caches if the last resolution event led to zero addresses */
- bool previous_resolution_zero_num_addresses;
+ /* caches if the last resolution event contained addresses */
+ bool previous_resolution_contained_addresses;
} channel_data;
typedef struct {
@@ -403,6 +403,8 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
+using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
+
// Creates a new LB policy, replacing any previous one.
// If the new policy is created successfully, sets *connectivity_state and
// *connectivity_error to its initial connectivity state; otherwise,
@@ -410,7 +412,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
static void create_new_lb_policy_locked(
channel_data* chand, char* lb_policy_name,
grpc_connectivity_state* connectivity_state,
- grpc_error** connectivity_error) {
+ grpc_error** connectivity_error, TraceStringVector* trace_strings) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
@@ -420,11 +422,21 @@ static void create_new_lb_policy_locked(
lb_policy_name, lb_policy_args);
if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+ if (chand->channelz_channel != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
lb_policy_name, new_lb_policy.get());
}
+ if (chand->channelz_channel != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
// Swap out the LB policy and update the fds in
// chand->interested_parties.
if (chand->lb_policy != nullptr) {
@@ -499,6 +511,51 @@ get_service_config_from_resolver_result_locked(channel_data* chand) {
return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
}
+static void check_for_important_resolution_change(
+ channel_data* chand, TraceStringVector* trace_strings) {
+ int resolution_contains_addresses = false;
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (addresses->num_addresses > 0) {
+ resolution_contains_addresses = true;
+ }
+ }
+ if (!resolution_contains_addresses &&
+ chand->previous_resolution_contained_addresses) {
+ trace_strings->push_back(gpr_strdup("Address list became empty"));
+ } else if (resolution_contains_addresses &&
+ !chand->previous_resolution_contained_addresses) {
+ trace_strings->push_back(gpr_strdup("Address list became non-empty"));
+ }
+ chand->previous_resolution_contained_addresses =
+ resolution_contains_addresses;
+}
+
+static void concatenate_and_add_channel_trace(
+ channel_data* chand, TraceStringVector* trace_strings) {
+ if (!trace_strings->empty()) {
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
+ bool is_first = 1;
+ for (size_t i = 0; i < trace_strings->size(); ++i) {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
+ is_first = false;
+ gpr_strvec_add(&v, (*trace_strings)[i]);
+ }
+ char* flat;
+ size_t flat_len = 0;
+ flat = gpr_strvec_flatten(&v, &flat_len);
+ chand->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_new(flat, flat_len, gpr_free));
+ gpr_strvec_destroy(&v);
+ }
+}
+
// Callback invoked when a resolver result is available.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
@@ -529,7 +586,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// (d) Address resolution that causes a new LB policy to be created.
//
// we track a list of strings to eventually be concatenated and traced.
- grpc_core::InlinedVector<char*, 3> trace_strings;
+ TraceStringVector trace_strings;
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* connectivity_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
@@ -564,12 +621,8 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
} else {
// Instantiate new LB policy.
create_new_lb_policy_locked(chand, lb_policy_name.get(),
- &connectivity_state, &connectivity_error);
- if (chand->channelz_channel != nullptr) {
- char* str;
- gpr_asprintf(&str, "Switched LB policy to %s", lb_policy_name.get());
- trace_strings.push_back(str);
- }
+ &connectivity_state, &connectivity_error,
+ &trace_strings);
}
// Find service config.
grpc_core::UniquePtr<char> service_config_json =
@@ -579,54 +632,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// only thing that modifies its value, and it can only be invoked
// once at any given time.
if (chand->channelz_channel != nullptr) {
- if ((service_config_json == nullptr &&
- chand->info_service_config_json != nullptr) ||
- (service_config_json != nullptr &&
- chand->info_service_config_json == nullptr) ||
+ if (((service_config_json == nullptr) !=
+ (chand->info_service_config_json == nullptr)) ||
(service_config_json != nullptr &&
- chand->info_service_config_json != nullptr &&
strcmp(service_config_json.get(),
chand->info_service_config_json.get()) != 0)) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
- trace_strings.push_back(gpr_strdup("Service config reloaded"));
- }
- int zero_num_addresses = true;
- const grpc_arg* channel_arg =
- grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
- if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
- if (addresses->num_addresses > 0) {
- zero_num_addresses = false;
- }
- }
- if (zero_num_addresses &&
- !chand->previous_resolution_zero_num_addresses) {
- trace_strings.push_back(gpr_strdup("Address list became empty"));
- } else if (!zero_num_addresses &&
- chand->previous_resolution_zero_num_addresses) {
- trace_strings.push_back(gpr_strdup("Address list became non-empty"));
- }
- chand->previous_resolution_zero_num_addresses = zero_num_addresses;
- if (!trace_strings.empty()) {
- gpr_strvec v;
- gpr_strvec_init(&v);
- gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
- bool is_first = 1;
- for (size_t i = 0; i < trace_strings.size(); ++i) {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
- is_first = false;
- gpr_strvec_add(&v, trace_strings[i]);
- }
- char* flat;
- size_t flat_len = 0;
- flat = gpr_strvec_flatten(&v, &flat_len);
- chand->channelz_channel->AddTraceEvent(
- grpc_core::channelz::ChannelTrace::Severity::Info,
- grpc_slice_new(flat, flat_len, gpr_free));
- gpr_strvec_destroy(&v);
+ trace_strings.push_back(gpr_strdup("Service config changed"));
}
+ check_for_important_resolution_change(chand, &trace_strings);
+ concatenate_and_add_channel_trace(chand, &trace_strings);
}
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
@@ -796,7 +812,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
chand->channelz_channel = nullptr;
- chand->previous_resolution_zero_num_addresses = true;
+ chand->previous_resolution_contained_addresses = false;
// Record client channel factory.
arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
diff --git a/test/core/end2end/tests/channelz.cc b/test/core/end2end/tests/channelz.cc
index a812994435..7c61b7910b 100644
--- a/test/core/end2end/tests/channelz.cc
+++ b/test/core/end2end/tests/channelz.cc
@@ -288,6 +288,8 @@ static void test_channelz_with_channel_trace(grpc_end2end_test_config config) {
grpc_server_get_channelz_node(f.server);
GPR_ASSERT(channelz_server != nullptr);
+ run_one_request(config, f, true);
+
char* json = channelz_channel->RenderJsonString();
GPR_ASSERT(json != nullptr);
gpr_log(GPR_INFO, "%s", json);
diff --git a/test/core/end2end/tests/retry_streaming.cc b/test/core/end2end/tests/retry_streaming.cc
index d06d124ca4..d52574fc58 100644
--- a/test/core/end2end/tests/retry_streaming.cc
+++ b/test/core/end2end/tests/retry_streaming.cc
@@ -28,6 +28,9 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/server.h"
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@@ -133,25 +136,30 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
int was_cancelled = 2;
char* peer;
- grpc_arg arg;
- arg.type = GRPC_ARG_STRING;
- arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
- arg.value.string = const_cast<char*>(
- "{\n"
- " \"methodConfig\": [ {\n"
- " \"name\": [\n"
- " { \"service\": \"service\", \"method\": \"method\" }\n"
- " ],\n"
- " \"retryPolicy\": {\n"
- " \"maxAttempts\": 3,\n"
- " \"initialBackoff\": \"1s\",\n"
- " \"maxBackoff\": \"120s\",\n"
- " \"backoffMultiplier\": 1.6,\n"
- " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
- " }\n"
- " } ]\n"
- "}");
- grpc_channel_args client_args = {1, &arg};
+ grpc_arg arg[] = {
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE),
+ 1024 * 8),
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ENABLE_CHANNELZ), true),
+ grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
+ const_cast<char*>(
+ "{\n"
+ " \"methodConfig\": [ {\n"
+ " \"name\": [\n"
+ " { \"service\": \"service\", \"method\": \"method\" }\n"
+ " ],\n"
+ " \"retryPolicy\": {\n"
+ " \"maxAttempts\": 3,\n"
+ " \"initialBackoff\": \"1s\",\n"
+ " \"maxBackoff\": \"120s\",\n"
+ " \"backoffMultiplier\": 1.6,\n"
+ " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
+ " }\n"
+ " } ]\n"
+ "}"))};
+ grpc_channel_args client_args = {GPR_ARRAY_SIZE(arg), arg};
grpc_end2end_test_fixture f =
begin_test(config, "retry_streaming", &client_args, nullptr);
@@ -161,6 +169,9 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(f.client);
+
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
@@ -384,6 +395,11 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 1);
+ GPR_ASSERT(channelz_channel != nullptr);
+ char* json = channelz_channel->RenderJsonString();
+ GPR_ASSERT(json != nullptr);
+ gpr_log(GPR_INFO, "%s", json);
+
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
@@ -414,6 +430,7 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
void retry_streaming(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
+
test_retry_streaming(config);
}