aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2017-06-06 19:45:58 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2017-06-06 19:45:58 -0700
commit87d5a3130dc94898aed40e74e998a72d21156ae5 (patch)
tree13dd3349d59c237f3a657f41488dc55eff18265a /test
parentde15a40fc3c7d1cac320ed19d27a3378e5be7c4f (diff)
Implement LB policy updates
Diffstat (limited to 'test')
-rw-r--r--test/core/client_channel/resolvers/BUILD2
-rw-r--r--test/core/client_channel/resolvers/fake_resolver_test.c5
-rw-r--r--test/core/end2end/BUILD12
-rw-r--r--test/core/end2end/fake_resolver.c253
-rw-r--r--test/core/end2end/fake_resolver.h76
-rwxr-xr-xtest/core/end2end/generate_tests.bzl1
-rw-r--r--test/core/slice/slice_hash_table_test.c130
-rw-r--r--test/cpp/end2end/BUILD6
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc485
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc323
-rw-r--r--test/cpp/grpclb/grpclb_test.cc5
11 files changed, 924 insertions, 374 deletions
diff --git a/test/core/client_channel/resolvers/BUILD b/test/core/client_channel/resolvers/BUILD
index 80ca7d3ebb..030f6091b5 100644
--- a/test/core/client_channel/resolvers/BUILD
+++ b/test/core/client_channel/resolvers/BUILD
@@ -74,7 +74,7 @@ grpc_cc_test(
deps = [
"//:gpr",
"//:grpc",
- "//test/core/end2end:fake_resolver",
+ "//:grpc_resolver_fake",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
diff --git a/test/core/client_channel/resolvers/fake_resolver_test.c b/test/core/client_channel/resolvers/fake_resolver_test.c
index 861918fbd6..c211f26b76 100644
--- a/test/core/client_channel/resolvers/fake_resolver_test.c
+++ b/test/core/client_channel/resolvers/fake_resolver_test.c
@@ -39,18 +39,18 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
-#include "test/core/end2end/fake_resolver.h"
#include "test/core/util/test_config.h"
static grpc_resolver *build_fake_resolver(
grpc_exec_ctx *exec_ctx, grpc_combiner *combiner,
grpc_fake_resolver_response_generator *response_generator) {
- grpc_resolver_factory *factory = grpc_resolver_factory_lookup("test");
+ grpc_resolver_factory *factory = grpc_resolver_factory_lookup("fake");
grpc_arg generator_arg =
grpc_fake_resolver_response_generator_arg(response_generator);
grpc_resolver_args args;
@@ -177,7 +177,6 @@ static void test_fake_resolver() {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- grpc_fake_resolver_init(); // Registers the "test" scheme.
grpc_init();
test_fake_resolver();
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index cf387a93e8..7705f62a79 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -59,18 +59,6 @@ grpc_cc_library(
visibility = ["//test:__subpackages__"],
)
-grpc_cc_library(
- name = "fake_resolver",
- srcs = ["fake_resolver.c"],
- hdrs = ["fake_resolver.h"],
- language = "C",
- visibility = ["//test:__subpackages__"],
- deps = [
- "//:gpr",
- "//:grpc",
- "//test/core/util:grpc_test_util",
- ],
-)
grpc_cc_library(
name = "http_proxy",
diff --git a/test/core/end2end/fake_resolver.c b/test/core/end2end/fake_resolver.c
deleted file mode 100644
index 736b224fd6..0000000000
--- a/test/core/end2end/fake_resolver.c
+++ /dev/null
@@ -1,253 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-// This is similar to the sockaddr resolver, except that it supports a
-// bunch of query args that are useful for dependency injection in tests.
-
-#include <limits.h>
-#include <stdbool.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
-#include <grpc/support/port_platform.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/ext/filters/client_channel/parse_address.h"
-#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/unix_sockets_posix.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
-
-#include "test/core/end2end/fake_resolver.h"
-
-//
-// fake_resolver
-//
-
-typedef struct {
- // base class -- must be first
- grpc_resolver base;
-
- // passed-in parameters
- grpc_channel_args* channel_args;
-
- // If not NULL, the next set of resolution results to be returned to
- // grpc_resolver_next_locked()'s closure.
- grpc_channel_args* next_results;
-
- // pending next completion, or NULL
- grpc_closure* next_completion;
- // target result address for next completion
- grpc_channel_args** target_result;
-} fake_resolver;
-
-static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
- fake_resolver* r = (fake_resolver*)gr;
- grpc_channel_args_destroy(exec_ctx, r->next_results);
- grpc_channel_args_destroy(exec_ctx, r->channel_args);
- gpr_free(r);
-}
-
-static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx,
- grpc_resolver* resolver) {
- fake_resolver* r = (fake_resolver*)resolver;
- if (r->next_completion != NULL) {
- *r->target_result = NULL;
- grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
- r->next_completion = NULL;
- }
-}
-
-static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
- fake_resolver* r) {
- if (r->next_completion != NULL && r->next_results != NULL) {
- *r->target_result =
- grpc_channel_args_merge(r->channel_args, r->next_results);
- grpc_channel_args_destroy(exec_ctx, r->next_results);
- grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
- r->next_completion = NULL;
- r->next_results = NULL;
- }
-}
-
-static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
- grpc_resolver* resolver) {
- fake_resolver* r = (fake_resolver*)resolver;
- fake_resolver_maybe_finish_next_locked(exec_ctx, r);
-}
-
-static void fake_resolver_next_locked(grpc_exec_ctx* exec_ctx,
- grpc_resolver* resolver,
- grpc_channel_args** target_result,
- grpc_closure* on_complete) {
- fake_resolver* r = (fake_resolver*)resolver;
- GPR_ASSERT(!r->next_completion);
- r->next_completion = on_complete;
- r->target_result = target_result;
- fake_resolver_maybe_finish_next_locked(exec_ctx, r);
-}
-
-static const grpc_resolver_vtable fake_resolver_vtable = {
- fake_resolver_destroy, fake_resolver_shutdown_locked,
- fake_resolver_channel_saw_error_locked, fake_resolver_next_locked};
-
-struct grpc_fake_resolver_response_generator {
- fake_resolver* resolver; // Set by the fake_resolver constructor to itself.
- grpc_channel_args* next_response;
- gpr_refcount refcount;
-};
-
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_response_generator_create() {
- grpc_fake_resolver_response_generator* generator =
- (grpc_fake_resolver_response_generator*)gpr_zalloc(sizeof(*generator));
- gpr_ref_init(&generator->refcount, 1);
- return generator;
-}
-
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_response_generator_ref(
- grpc_fake_resolver_response_generator* generator) {
- gpr_ref(&generator->refcount);
- return generator;
-}
-
-void grpc_fake_resolver_response_generator_unref(
- grpc_fake_resolver_response_generator* generator) {
- if (gpr_unref(&generator->refcount)) {
- gpr_free(generator);
- }
-}
-
-static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_fake_resolver_response_generator* generator =
- (grpc_fake_resolver_response_generator*)arg;
- fake_resolver* r = generator->resolver;
- if (r->next_results != NULL) {
- grpc_channel_args_destroy(exec_ctx, r->next_results);
- }
- r->next_results = generator->next_response;
- fake_resolver_maybe_finish_next_locked(exec_ctx, r);
-}
-
-void grpc_fake_resolver_response_generator_set_response(
- grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
- grpc_channel_args* next_response) {
- GPR_ASSERT(generator->resolver != NULL);
- generator->next_response = grpc_channel_args_copy(next_response);
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(
- set_response_cb, generator,
- grpc_combiner_scheduler(generator->resolver->base.combiner, false)),
- GRPC_ERROR_NONE);
-}
-
-static void* response_generator_arg_copy(void* p) {
- return grpc_fake_resolver_response_generator_ref(
- (grpc_fake_resolver_response_generator*)p);
-}
-
-static void response_generator_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) {
- grpc_fake_resolver_response_generator_unref(
- (grpc_fake_resolver_response_generator*)p);
-}
-
-static int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
-
-static const grpc_arg_pointer_vtable response_generator_arg_vtable = {
- response_generator_arg_copy, response_generator_arg_destroy,
- response_generator_cmp};
-
-grpc_arg grpc_fake_resolver_response_generator_arg(
- grpc_fake_resolver_response_generator* generator) {
- grpc_arg arg;
- arg.type = GRPC_ARG_POINTER;
- arg.key = GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
- arg.value.pointer.p = generator;
- arg.value.pointer.vtable = &response_generator_arg_vtable;
- return arg;
-}
-
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_get_response_generator(const grpc_channel_args* args) {
- const grpc_arg* arg =
- grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) return NULL;
- return (grpc_fake_resolver_response_generator*)arg->value.pointer.p;
-}
-
-//
-// fake_resolver_factory
-//
-
-static void fake_resolver_factory_ref(grpc_resolver_factory* factory) {}
-
-static void fake_resolver_factory_unref(grpc_resolver_factory* factory) {}
-
-static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx,
- grpc_resolver_factory* factory,
- grpc_resolver_args* args) {
- fake_resolver* r = (fake_resolver*)gpr_zalloc(sizeof(*r));
- r->channel_args = grpc_channel_args_copy(args->args);
- grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner);
- grpc_fake_resolver_response_generator* response_generator =
- grpc_fake_resolver_get_response_generator(args->args);
- if (response_generator != NULL) response_generator->resolver = r;
- return &r->base;
-}
-
-static char* fake_resolver_get_default_authority(grpc_resolver_factory* factory,
- grpc_uri* uri) {
- const char* path = uri->path;
- if (path[0] == '/') ++path;
- return gpr_strdup(path);
-}
-
-static const grpc_resolver_factory_vtable fake_resolver_factory_vtable = {
- fake_resolver_factory_ref, fake_resolver_factory_unref,
- fake_resolver_create, fake_resolver_get_default_authority, "test"};
-
-static grpc_resolver_factory fake_resolver_factory = {
- &fake_resolver_factory_vtable};
-
-void grpc_fake_resolver_init(void) {
- grpc_register_resolver_type(&fake_resolver_factory);
-}
diff --git a/test/core/end2end/fake_resolver.h b/test/core/end2end/fake_resolver.h
deleted file mode 100644
index d9668d0d11..0000000000
--- a/test/core/end2end/fake_resolver.h
+++ /dev/null
@@ -1,76 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
-#define GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
-
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
-#include "src/core/lib/channel/channel_args.h"
-
-#include "test/core/util/test_config.h"
-
-#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
- "grpc.fake_resolver.response_generator"
-
-void grpc_fake_resolver_init();
-
-// Instances of \a grpc_fake_resolver_response_generator are passed to the
-// fake resolver in a channel argument (see \a
-// grpc_fake_resolver_response_generator_arg) in order to inject and trigger
-// custom resolutions. See also \a
-// grpc_fake_resolver_response_generator_set_response.
-typedef struct grpc_fake_resolver_response_generator
- grpc_fake_resolver_response_generator;
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_response_generator_create();
-
-// Instruct the fake resolver associated with the \a response_generator instance
-// to trigger a new resolution for \a uri and \a args.
-void grpc_fake_resolver_response_generator_set_response(
- grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
- grpc_channel_args* next_response);
-
-// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance.
-grpc_arg grpc_fake_resolver_response_generator_arg(
- grpc_fake_resolver_response_generator* generator);
-// Return the \a grpc_fake_resolver_response_generator instance in \a args or
-// NULL.
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_get_response_generator(const grpc_channel_args* args);
-
-grpc_fake_resolver_response_generator*
-grpc_fake_resolver_response_generator_ref(
- grpc_fake_resolver_response_generator* generator);
-void grpc_fake_resolver_response_generator_unref(
- grpc_fake_resolver_response_generator* generator);
-
-#endif /* GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H */
diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl
index 6865aefa3d..0adf7eb989 100755
--- a/test/core/end2end/generate_tests.bzl
+++ b/test/core/end2end/generate_tests.bzl
@@ -173,7 +173,6 @@ def grpc_end2end_tests():
deps = [
':cq_verifier',
':ssl_test_data',
- ':fake_resolver',
':http_proxy',
':proxy',
]
diff --git a/test/core/slice/slice_hash_table_test.c b/test/core/slice/slice_hash_table_test.c
index 67041b2d5c..16bfb424c3 100644
--- a/test/core/slice/slice_hash_table_test.c
+++ b/test/core/slice/slice_hash_table_test.c
@@ -77,6 +77,19 @@ static void destroy_string(grpc_exec_ctx* exec_ctx, void* value) {
gpr_free(value);
}
+static grpc_slice_hash_table* create_table_from_entries(
+ const test_entry* test_entries, size_t num_test_entries,
+ int (*value_cmp_fn)(void*, void*)) {
+ // Construct table.
+ grpc_slice_hash_table_entry* entries =
+ gpr_zalloc(sizeof(*entries) * num_test_entries);
+ populate_entries(test_entries, num_test_entries, entries);
+ grpc_slice_hash_table* table = grpc_slice_hash_table_create(
+ num_test_entries, entries, destroy_string, value_cmp_fn);
+ gpr_free(entries);
+ return table;
+}
+
static void test_slice_hash_table() {
const test_entry test_entries[] = {
{"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"},
@@ -115,13 +128,8 @@ static void test_slice_hash_table() {
{"key_99", "value_99"},
};
const size_t num_entries = GPR_ARRAY_SIZE(test_entries);
- // Construct table.
- grpc_slice_hash_table_entry* entries =
- gpr_zalloc(sizeof(*entries) * num_entries);
- populate_entries(test_entries, num_entries, entries);
grpc_slice_hash_table* table =
- grpc_slice_hash_table_create(num_entries, entries, destroy_string);
- gpr_free(entries);
+ create_table_from_entries(test_entries, num_entries, NULL);
// Check contents of table.
check_values(test_entries, num_entries, table);
check_non_existent_value("XX", table);
@@ -131,8 +139,118 @@ static void test_slice_hash_table() {
grpc_exec_ctx_finish(&exec_ctx);
}
+static int value_cmp_fn(void* a, void* b) {
+ const char* a_str = a;
+ const char* b_str = b;
+ return strcmp(a_str, b_str);
+}
+
+static int pointer_cmp_fn(void* a, void* b) { return GPR_ICMP(a, b); }
+
+static void test_slice_hash_table_eq() {
+ const test_entry test_entries_a[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a);
+ grpc_slice_hash_table* table_a =
+ create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_a) == 0);
+
+ const test_entry test_entries_b[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_b = GPR_ARRAY_SIZE(test_entries_b);
+ grpc_slice_hash_table* table_b =
+ create_table_from_entries(test_entries_b, num_entries_b, value_cmp_fn);
+
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b) == 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_hash_table_unref(&exec_ctx, table_a);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_hash_table_not_eq() {
+ const test_entry test_entries_a[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a);
+ grpc_slice_hash_table* table_a =
+ create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn);
+
+ // Different sizes.
+ const test_entry test_entries_b_smaller[] = {{"key_0", "value_0"},
+ {"key_1", "value_1"}};
+ const size_t num_entries_b_smaller = GPR_ARRAY_SIZE(test_entries_b_smaller);
+ grpc_slice_hash_table* table_b_smaller = create_table_from_entries(
+ test_entries_b_smaller, num_entries_b_smaller, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_smaller) > 0);
+
+ const test_entry test_entries_b_larger[] = {{"key_0", "value_0"},
+ {"key_1", "value_1"},
+ {"key_2", "value_2"},
+ {"key_3", "value_3"}};
+ const size_t num_entries_b_larger = GPR_ARRAY_SIZE(test_entries_b_larger);
+ grpc_slice_hash_table* table_b_larger = create_table_from_entries(
+ test_entries_b_larger, num_entries_b_larger, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_larger) < 0);
+
+ // One key doesn't match and is lexicographically "smaller".
+ const test_entry test_entries_c[] = {
+ {"key_zz", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_c = GPR_ARRAY_SIZE(test_entries_c);
+ grpc_slice_hash_table* table_c =
+ create_table_from_entries(test_entries_c, num_entries_c, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_c) > 0);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_c, table_a) < 0);
+
+ // One value doesn't match.
+ const test_entry test_entries_d[] = {
+ {"key_0", "value_z"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_d = GPR_ARRAY_SIZE(test_entries_d);
+ grpc_slice_hash_table* table_d =
+ create_table_from_entries(test_entries_d, num_entries_d, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_d) < 0);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_d, table_a) > 0);
+
+ // Same values but different "equals" functions.
+ const test_entry test_entries_e[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_e = GPR_ARRAY_SIZE(test_entries_e);
+ grpc_slice_hash_table* table_e =
+ create_table_from_entries(test_entries_e, num_entries_e, value_cmp_fn);
+ const test_entry test_entries_f[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_f = GPR_ARRAY_SIZE(test_entries_f);
+ grpc_slice_hash_table* table_f =
+ create_table_from_entries(test_entries_f, num_entries_f, pointer_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_e, table_f) != 0);
+
+ // Same (empty) key, different values.
+ const test_entry test_entries_g[] = {{"", "value_0"}};
+ const size_t num_entries_g = GPR_ARRAY_SIZE(test_entries_g);
+ grpc_slice_hash_table* table_g =
+ create_table_from_entries(test_entries_g, num_entries_g, value_cmp_fn);
+ const test_entry test_entries_h[] = {{"", "value_1"}};
+ const size_t num_entries_h = GPR_ARRAY_SIZE(test_entries_h);
+ grpc_slice_hash_table* table_h =
+ create_table_from_entries(test_entries_h, num_entries_h, pointer_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_g, table_h) != 0);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_hash_table_unref(&exec_ctx, table_a);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b_larger);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b_smaller);
+ grpc_slice_hash_table_unref(&exec_ctx, table_c);
+ grpc_slice_hash_table_unref(&exec_ctx, table_d);
+ grpc_slice_hash_table_unref(&exec_ctx, table_e);
+ grpc_slice_hash_table_unref(&exec_ctx, table_f);
+ grpc_slice_hash_table_unref(&exec_ctx, table_g);
+ grpc_slice_hash_table_unref(&exec_ctx, table_h);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_slice_hash_table();
+ test_slice_hash_table_eq();
+ test_slice_hash_table_not_eq();
return 0;
}
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 9b691a83e0..97558d70bb 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -212,8 +212,8 @@ grpc_cc_test(
)
grpc_cc_test(
- name = "round_robin_end2end_test",
- srcs = ["round_robin_end2end_test.cc"],
+ name = "client_lb_end2end_test",
+ srcs = ["client_lb_end2end_test.cc"],
deps = [
":test_service_impl",
"//:gpr",
@@ -243,7 +243,7 @@ grpc_cc_test(
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
- "//test/core/end2end:fake_resolver",
+ "//:grpc_resolver_fake",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
new file mode 100644
index 0000000000..ff00225597
--- /dev/null
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -0,0 +1,485 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+extern "C" {
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+}
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+
+using grpc::testing::EchoRequest;
+using grpc::testing::EchoResponse;
+using std::chrono::system_clock;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+// Subclass of TestServiceImpl that increments a request counter for
+// every call to the Echo RPC.
+class MyTestServiceImpl : public TestServiceImpl {
+ public:
+ MyTestServiceImpl() : request_count_(0) {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ ++request_count_;
+ }
+ return TestServiceImpl::Echo(context, request, response);
+ }
+
+ int request_count() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return request_count_;
+ }
+
+ void ResetCounters() {
+ std::unique_lock<std::mutex> lock(mu_);
+ request_count_ = 0;
+ }
+
+ private:
+ std::mutex mu_;
+ int request_count_;
+};
+
+class ClientLbEnd2endTest : public ::testing::Test {
+ protected:
+ ClientLbEnd2endTest() : server_host_("localhost") {}
+
+ void SetUp() override {
+ response_generator_ = grpc_fake_resolver_response_generator_create();
+ }
+
+ void TearDown() override {
+ grpc_fake_resolver_response_generator_unref(response_generator_);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ servers_[i]->Shutdown();
+ }
+ }
+
+ void StartServers(int num_servers) {
+ for (int i = 0; i < num_servers; ++i) {
+ servers_.emplace_back(new ServerData(server_host_));
+ }
+ }
+
+ void SetNextResolution(const std::vector<int>& ports) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), NULL);
+ for (size_t i = 0; i < ports.size(); ++i) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
+ grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true);
+ GPR_ASSERT(lb_uri != NULL);
+ grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
+ false /* is balancer */,
+ "" /* balancer name */, NULL);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+ }
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args* fake_result =
+ grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1);
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator_, fake_result);
+ grpc_channel_args_destroy(&exec_ctx, fake_result);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ void ResetStub(const grpc::string& lb_policy_name = "") {
+ ChannelArguments args;
+ if (lb_policy_name.size() > 0) {
+ args.SetLoadBalancingPolicyName(lb_policy_name);
+ } // else, default to pick first
+ args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ response_generator_);
+ std::ostringstream uri;
+ uri << "fake:///";
+ for (size_t i = 0; i < servers_.size() - 1; ++i) {
+ uri << "127.0.0.1:" << servers_[i]->port_ << ",";
+ }
+ uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_;
+ channel_ =
+ CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ }
+
+ void SendRpc() {
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Live long and prosper.");
+ ClientContext context;
+ Status status = stub_->Echo(&context, request, &response);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(response.message(), request.message());
+ }
+
+ struct ServerData {
+ int port_;
+ std::unique_ptr<Server> server_;
+ MyTestServiceImpl service_;
+ std::unique_ptr<std::thread> thread_;
+
+ explicit ServerData(const grpc::string& server_host) {
+ port_ = grpc_pick_unused_port_or_die();
+ gpr_log(GPR_INFO, "starting server on port %d", port_);
+ std::mutex mu;
+ std::condition_variable cond;
+ thread_.reset(new std::thread(
+ std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
+ std::unique_lock<std::mutex> lock(mu);
+ cond.wait(lock);
+ gpr_log(GPR_INFO, "server startup complete");
+ }
+
+ void Start(const grpc::string& server_host, std::mutex* mu,
+ std::condition_variable* cond) {
+ std::ostringstream server_address;
+ server_address << server_host << ":" << port_;
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address.str(),
+ InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ std::lock_guard<std::mutex> lock(*mu);
+ cond->notify_one();
+ }
+
+ void Shutdown() {
+ server_->Shutdown();
+ thread_->join();
+ }
+ };
+
+ void ResetCounters() {
+ for (const auto& server : servers_) server->service_.ResetCounters();
+ }
+
+ void WaitForServer(size_t server_idx) {
+ do {
+ SendRpc();
+ } while (servers_[server_idx]->service_.request_count() == 0);
+ ResetCounters();
+ }
+
+ const grpc::string server_host_;
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::vector<std::unique_ptr<ServerData>> servers_;
+ grpc_fake_resolver_response_generator* response_generator_;
+};
+
+TEST_F(ClientLbEnd2endTest, PickFirst) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ SetNextResolution(ports);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ SendRpc();
+ }
+ // All requests should have gone to a single server.
+ bool found = false;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ const int request_count = servers_[i]->service_.request_count();
+ if (request_count == kNumServers) {
+ found = true;
+ } else {
+ EXPECT_EQ(0, request_count);
+ }
+ }
+ EXPECT_TRUE(found);
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+
+ // Perform one RPC against the first server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [0] *******");
+ SendRpc();
+ EXPECT_EQ(servers_[0]->service_.request_count(), 1);
+
+ // An empty update will result in the channel going into TRANSIENT_FAILURE.
+ ports.clear();
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET none *******");
+ grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ do {
+ channel_state = channel_->GetState(true /* try to connect */);
+ } while (channel_state == GRPC_CHANNEL_READY);
+ GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
+ servers_[0]->service_.ResetCounters();
+
+ // Next update introduces servers_[1], making the channel recover.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [1] *******");
+ WaitForServer(1);
+ EXPECT_EQ(servers_[0]->service_.request_count(), 0);
+
+ // And again for servers_[2]
+ ports.clear();
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [2] *******");
+ WaitForServer(2);
+ EXPECT_EQ(servers_[0]->service_.request_count(), 0);
+ EXPECT_EQ(servers_[1]->service_.request_count(), 0);
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+
+ // Perform one RPC against the first server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [0] *******");
+ SendRpc();
+ EXPECT_EQ(servers_[0]->service_.request_count(), 1);
+ servers_[0]->service_.ResetCounters();
+
+ // Send and superset update
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET superset *******");
+ SendRpc();
+ // We stick to the previously connected server.
+ WaitForServer(0);
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ for (size_t i = 0; i < 1000; ++i) {
+ std::random_shuffle(ports.begin(), ports.end());
+ SetNextResolution(ports);
+ if (i % 10 == 0) SendRpc();
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobin) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+ for (const auto& server : servers_) {
+ ports.emplace_back(server->port_);
+ }
+ SetNextResolution(ports);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ SendRpc();
+ }
+ // One request should have gone to each server.
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ EXPECT_EQ(1, servers_[i]->service_.request_count());
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+
+ // Start with a single server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ // Send RPCs. They should all go servers_[0]
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(10, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[0]->service_.ResetCounters();
+
+ // And now for the second server.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request.
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ WaitForServer(1);
+
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(0, servers_[0]->service_.request_count());
+ EXPECT_EQ(10, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[1]->service_.ResetCounters();
+
+ // ... and for the last server.
+ ports.clear();
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(2);
+
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(0, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(10, servers_[2]->service_.request_count());
+ servers_[2]->service_.ResetCounters();
+
+ // Back to all servers.
+ ports.clear();
+ ports.emplace_back(servers_[0]->port_);
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ WaitForServer(1);
+ WaitForServer(2);
+
+ // Send three RPCs, one per server.
+ for (size_t i = 0; i < 3; ++i) SendRpc();
+ EXPECT_EQ(1, servers_[0]->service_.request_count());
+ EXPECT_EQ(1, servers_[1]->service_.request_count());
+ EXPECT_EQ(1, servers_[2]->service_.request_count());
+
+ // An empty update will result in the channel going into TRANSIENT_FAILURE.
+ ports.clear();
+ SetNextResolution(ports);
+ grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ do {
+ channel_state = channel_->GetState(true /* try to connect */);
+ } while (channel_state == GRPC_CHANNEL_READY);
+ GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
+ servers_[0]->service_.ResetCounters();
+
+ // Next update introduces servers_[1], making the channel recover.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+ WaitForServer(1);
+ channel_state = channel_->GetState(false /* try to connect */);
+ GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ for (size_t i = 0; i < 1000; ++i) {
+ std::random_shuffle(ports.begin(), ports.end());
+ SetNextResolution(ports);
+ if (i % 10 == 0) SendRpc();
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_test_init(argc, argv);
+ grpc_init();
+ const auto result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index b0d4e2dadf..6ff3642adf 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -50,8 +50,8 @@
#include <gtest/gtest.h>
extern "C" {
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/iomgr/sockaddr.h"
-#include "test/core/end2end/fake_resolver.h"
}
#include "test/core/util/port.h"
@@ -117,6 +117,12 @@ class CountedService : public ServiceType {
++request_count_;
}
+ void ResetCounters() {
+ std::unique_lock<std::mutex> lock(mu_);
+ request_count_ = 0;
+ response_count_ = 0;
+ }
+
protected:
std::mutex mu_;
@@ -181,6 +187,7 @@ class BalancerServiceImpl : public BalancerService {
shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
+ gpr_log(GPR_INFO, "LB: BalanceLoad");
LoadBalanceRequest request;
stream->Read(&request);
IncreaseRequestCount();
@@ -200,9 +207,16 @@ class BalancerServiceImpl : public BalancerService {
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
- if (shutdown_) break;
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (shutdown_) break;
+ }
SendResponse(stream, response_and_delay.first, response_and_delay.second);
}
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ serverlist_cond_.wait(lock);
+ }
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
@@ -226,9 +240,10 @@ class BalancerServiceImpl : public BalancerService {
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
std::lock_guard<std::mutex> lock(mu_);
- cond_.notify_one();
+ load_report_cond_.notify_one();
}
+ gpr_log(GPR_INFO, "LB: done");
return Status::OK;
}
@@ -237,9 +252,14 @@ class BalancerServiceImpl : public BalancerService {
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
- void Shutdown() {
+ // Returns true on its first invocation, false otherwise.
+ bool Shutdown() {
+ NotifyDoneWithServerlists();
std::unique_lock<std::mutex> lock(mu_);
+ const bool prev = !shutdown_;
shutdown_ = true;
+ gpr_log(GPR_INFO, "LB: shut down");
+ return prev;
}
static LoadBalanceResponse BuildResponseForBackends(
@@ -264,26 +284,35 @@ class BalancerServiceImpl : public BalancerService {
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
- cond_.wait(lock);
+ load_report_cond_.wait(lock);
return client_stats_;
}
+ void NotifyDoneWithServerlists() {
+ std::lock_guard<std::mutex> lock(mu_);
+ serverlist_cond_.notify_one();
+ }
+
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+ if (delay_ms > 0) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+ }
gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
response.DebugString().c_str());
- stream->Write(response);
IncreaseResponseCount();
+ stream->Write(response);
}
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
- std::condition_variable cond_;
+ std::condition_variable load_report_cond_;
+ std::condition_variable serverlist_cond_;
ClientStats client_stats_;
bool shutdown_;
};
@@ -326,8 +355,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
backend_servers_[i].Shutdown();
}
for (size_t i = 0; i < balancers_.size(); ++i) {
- balancers_[i]->Shutdown();
- balancer_servers_[i].Shutdown();
+ if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
}
grpc_fake_resolver_response_generator_unref(response_generator_);
}
@@ -336,8 +364,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
ChannelArguments args;
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
- channel_ = CreateCustomChannel("test:///not_used",
- InsecureChannelCredentials(), args);
+ std::ostringstream uri;
+ uri << "fake:///servername_not_used";
+ channel_ =
+ CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@@ -497,6 +527,7 @@ TEST_F(SingleBalancerTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -541,7 +572,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
-
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent two responses.
@@ -608,13 +639,272 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
-
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+class UpdatesTest : public GrpclbEnd2endTest {
+ public:
+ UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
+};
+
+TEST_F(UpdatesTest, UpdateBalancers) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[1]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ do {
+ auto statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (backend_servers_[1].service_->request_count() == 0);
+
+ backend_servers_[1].service_->ResetCounters();
+ gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the second backend.
+ EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+// Send an update with the same set of LBs as the one in SetUp() in order to
+// verify that the LB channel inside grpclb keeps the initial connection (which
+// by definition is also present in the update).
+TEST_F(UpdatesTest, UpdateBalancersRepeated) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[0]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ gpr_timespec deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
+ // Send 10 seconds worth of RPCs
+ do {
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
+ // grpclb continued using the original LB call to the first balancer, which
+ // doesn't assign the second backend.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ balancers_[0]->NotifyDoneWithServerlists();
+
+ addresses.clear();
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
+
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(10000, GPR_TIMESPAN));
+ // Send 10 seconds worth of RPCs
+ do {
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
+ // grpclb continued using the original LB call to the first balancer, which
+ // doesn't assign the second backend.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ balancers_[0]->NotifyDoneWithServerlists();
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[1]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ // Kill balancer 0
+ gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
+ if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
+ gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
+
+ // This is serviced by the existing RR policy
+ gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should again have gone to the first backend.
+ EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request. In the meantime, the client continues to be serviced
+ // (by the first backend) without interruption.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ do {
+ auto statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (backend_servers_[1].service_->request_count() == 0);
+
+ // This is serviced by the existing RR policy
+ backend_servers_[1].service_->ResetCounters();
+ gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the second backend.
+ EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
@@ -677,6 +967,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -722,6 +1013,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -748,7 +1040,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
int main(int argc, char** argv) {
grpc_init();
grpc_test_init(argc, argv);
- grpc_fake_resolver_init();
::testing::InitGoogleTest(&argc, argv);
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index a002c7f77d..ee5dfa7133 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -52,6 +52,7 @@
#include <grpc++/impl/codegen/config.h>
extern "C" {
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -61,7 +62,6 @@ extern "C" {
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "test/core/end2end/cq_verifier.h"
-#include "test/core/end2end/fake_resolver.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
}
@@ -591,7 +591,7 @@ static void setup_client(const server_fixture *lb_server,
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
- gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport);
+ gpr_asprintf(&cf->server_uri, "fake:///%s", lb_server->servers_hostport);
const grpc_arg fake_addresses =
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args *fake_result =
@@ -804,7 +804,6 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
- grpc_fake_resolver_init();
grpc_test_init(argc, argv);
grpc_init();
const auto result = RUN_ALL_TESTS();