aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD14
-rw-r--r--CMakeLists.txt92
-rw-r--r--Makefile102
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml52
-rw-r--r--config.m42
-rw-r--r--config.w321
-rw-r--r--gRPC-Core.podspec3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--include/grpc/support/workaround_list.h2
-rw-r--r--package.xml2
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c73
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c504
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c51
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c53
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c426
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c626
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c (renamed from test/core/end2end/fake_resolver.c)10
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h (renamed from test/core/end2end/fake_resolver.h)11
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c7
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h5
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c21
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h8
-rw-r--r--src/core/ext/filters/workarounds/workaround_utils.h2
-rw-r--r--src/core/lib/channel/channel_args.c18
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/iomgr/polling_entity.h13
-rw-r--r--src/core/lib/security/transport/lb_targets_info.c4
-rw-r--r--src/core/lib/slice/slice_hash_table.c39
-rw-r--r--src/core/lib/slice/slice_hash_table.h19
-rw-r--r--src/core/lib/transport/service_config.c2
-rw-r--r--src/core/plugin_registry/grpc_plugin_registry.c4
-rw-r--r--src/core/plugin_registry/grpc_unsecure_plugin_registry.c4
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--test/core/client_channel/resolvers/BUILD2
-rw-r--r--test/core/client_channel/resolvers/fake_resolver_test.c5
-rw-r--r--test/core/end2end/BUILD12
-rwxr-xr-xtest/core/end2end/generate_tests.bzl1
-rw-r--r--test/core/slice/slice_hash_table_test.c130
-rw-r--r--test/cpp/end2end/BUILD6
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc485
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc323
-rw-r--r--test/cpp/grpclb/grpclb_test.cc5
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--tools/run_tests/generated/sources_and_headers.json67
-rw-r--r--tools/run_tests/generated/tests.json56
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters9
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj6
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters27
-rw-r--r--vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj6
-rw-r--r--vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters33
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters9
-rw-r--r--vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj (renamed from vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj)8
-rw-r--r--vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj.filters (renamed from vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj.filters)8
62 files changed, 2575 insertions, 854 deletions
diff --git a/BUILD b/BUILD
index d3a77808ea..ed206e25f4 100644
--- a/BUILD
+++ b/BUILD
@@ -921,6 +921,7 @@ grpc_cc_library(
deps = [
"grpc_base",
"grpc_client_channel",
+ "grpc_resolver_fake",
],
)
@@ -950,6 +951,7 @@ grpc_cc_library(
"grpc_base",
"grpc_client_channel",
"grpc_secure",
+ "grpc_resolver_fake",
],
)
@@ -1039,6 +1041,18 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "grpc_resolver_fake",
+ srcs = ["src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c"],
+ hdrs = ["src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"],
+ visibility = ["//test:__subpackages__"],
+ language = "c",
+ deps = [
+ "grpc_base",
+ "grpc_client_channel",
+ ],
+)
+
+grpc_cc_library(
name = "grpc_secure",
srcs = [
"src/core/lib/http/httpcli_security_connector.c",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 11c6b04788..faf8a683d0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -666,6 +666,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_crash_test)
endif()
add_dependencies(buildtests_cxx client_crash_test_server)
+add_dependencies(buildtests_cxx client_lb_end2end_test)
add_dependencies(buildtests_cxx codegen_test_full)
add_dependencies(buildtests_cxx codegen_test_minimal)
add_dependencies(buildtests_cxx credentials_test)
@@ -716,7 +717,6 @@ endif()
add_dependencies(buildtests_cxx qps_worker)
add_dependencies(buildtests_cxx reconnect_interop_client)
add_dependencies(buildtests_cxx reconnect_interop_server)
-add_dependencies(buildtests_cxx round_robin_end2end_test)
add_dependencies(buildtests_cxx secure_auth_context_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx secure_sync_unary_ping_pong_test)
@@ -1142,6 +1142,7 @@ add_library(grpc
third_party/nanopb/pb_common.c
third_party/nanopb/pb_decode.c
third_party/nanopb/pb_encode.c
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -1566,8 +1567,8 @@ add_library(grpc_test_util
test/core/end2end/data/server1_key.c
test/core/end2end/data/test_root_cert.c
test/core/security/oauth2_utils.c
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
test/core/end2end/cq_verifier.c
- test/core/end2end/fake_resolver.c
test/core/end2end/fixtures/http_proxy_fixture.c
test/core/end2end/fixtures/proxy.c
test/core/iomgr/endpoint_tests.c
@@ -1787,8 +1788,8 @@ endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_library(grpc_test_util_unsecure
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
test/core/end2end/cq_verifier.c
- test/core/end2end/fake_resolver.c
test/core/end2end/fixtures/http_proxy_fixture.c
test/core/end2end/fixtures/proxy.c
test/core/iomgr/endpoint_tests.c
@@ -2030,6 +2031,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
src/core/ext/filters/load_reporting/load_reporting.c
src/core/ext/filters/load_reporting/load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -9666,6 +9668,48 @@ target_link_libraries(client_crash_test_server
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(client_lb_end2end_test
+ test/cpp/end2end/client_lb_end2end_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(client_lb_end2end_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_lb_end2end_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc++_test_util
+ grpc_test_util
+ grpc++
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(codegen_test_full
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/control.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/control.grpc.pb.cc
@@ -11608,48 +11652,6 @@ target_link_libraries(reconnect_interop_server
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
-add_executable(round_robin_end2end_test
- test/cpp/end2end/round_robin_end2end_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
-
-
-target_include_directories(round_robin_end2end_test
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
- PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
- PRIVATE ${BORINGSSL_ROOT_DIR}/include
- PRIVATE ${PROTOBUF_ROOT_DIR}/src
- PRIVATE ${BENCHMARK_ROOT_DIR}/include
- PRIVATE ${ZLIB_ROOT_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
- PRIVATE ${CARES_BUILD_INCLUDE_DIR}
- PRIVATE ${CARES_INCLUDE_DIR}
- PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
- PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
- PRIVATE third_party/googletest/googletest/include
- PRIVATE third_party/googletest/googletest
- PRIVATE third_party/googletest/googlemock/include
- PRIVATE third_party/googletest/googlemock
- PRIVATE ${_gRPC_PROTO_GENS_DIR}
-)
-
-target_link_libraries(round_robin_end2end_test
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
- ${_gRPC_GFLAGS_LIBRARIES}
-)
-
-endif (gRPC_BUILD_TESTS)
-if (gRPC_BUILD_TESTS)
-
add_executable(secure_auth_context_test
test/cpp/common/secure_auth_context_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 120e7f33e1..a7582946d1 100644
--- a/Makefile
+++ b/Makefile
@@ -1127,6 +1127,7 @@ channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server
+client_lb_end2end_test: $(BINDIR)/$(CONFIG)/client_lb_end2end_test
codegen_test_full: $(BINDIR)/$(CONFIG)/codegen_test_full
codegen_test_minimal: $(BINDIR)/$(CONFIG)/codegen_test_minimal
credentials_test: $(BINDIR)/$(CONFIG)/credentials_test
@@ -1170,7 +1171,6 @@ qps_openloop_test: $(BINDIR)/$(CONFIG)/qps_openloop_test
qps_worker: $(BINDIR)/$(CONFIG)/qps_worker
reconnect_interop_client: $(BINDIR)/$(CONFIG)/reconnect_interop_client
reconnect_interop_server: $(BINDIR)/$(CONFIG)/reconnect_interop_server
-round_robin_end2end_test: $(BINDIR)/$(CONFIG)/round_robin_end2end_test
secure_auth_context_test: $(BINDIR)/$(CONFIG)/secure_auth_context_test
secure_sync_unary_ping_pong_test: $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test
server_builder_plugin_test: $(BINDIR)/$(CONFIG)/server_builder_plugin_test
@@ -1565,6 +1565,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
+ $(BINDIR)/$(CONFIG)/client_lb_end2end_test \
$(BINDIR)/$(CONFIG)/codegen_test_full \
$(BINDIR)/$(CONFIG)/codegen_test_minimal \
$(BINDIR)/$(CONFIG)/credentials_test \
@@ -1601,7 +1602,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/qps_worker \
$(BINDIR)/$(CONFIG)/reconnect_interop_client \
$(BINDIR)/$(CONFIG)/reconnect_interop_server \
- $(BINDIR)/$(CONFIG)/round_robin_end2end_test \
$(BINDIR)/$(CONFIG)/secure_auth_context_test \
$(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test \
$(BINDIR)/$(CONFIG)/server_builder_plugin_test \
@@ -1687,6 +1687,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
+ $(BINDIR)/$(CONFIG)/client_lb_end2end_test \
$(BINDIR)/$(CONFIG)/codegen_test_full \
$(BINDIR)/$(CONFIG)/codegen_test_minimal \
$(BINDIR)/$(CONFIG)/credentials_test \
@@ -1723,7 +1724,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/qps_worker \
$(BINDIR)/$(CONFIG)/reconnect_interop_client \
$(BINDIR)/$(CONFIG)/reconnect_interop_server \
- $(BINDIR)/$(CONFIG)/round_robin_end2end_test \
$(BINDIR)/$(CONFIG)/secure_auth_context_test \
$(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test \
$(BINDIR)/$(CONFIG)/server_builder_plugin_test \
@@ -2061,6 +2061,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
$(E) "[RUN] Testing client_crash_test"
$(Q) $(BINDIR)/$(CONFIG)/client_crash_test || ( echo test client_crash_test failed ; exit 1 )
+ $(E) "[RUN] Testing client_lb_end2end_test"
+ $(Q) $(BINDIR)/$(CONFIG)/client_lb_end2end_test || ( echo test client_lb_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing codegen_test_full"
$(Q) $(BINDIR)/$(CONFIG)/codegen_test_full || ( echo test codegen_test_full failed ; exit 1 )
$(E) "[RUN] Testing codegen_test_minimal"
@@ -2109,8 +2111,6 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/proto_utils_test || ( echo test proto_utils_test failed ; exit 1 )
$(E) "[RUN] Testing qps_openloop_test"
$(Q) $(BINDIR)/$(CONFIG)/qps_openloop_test || ( echo test qps_openloop_test failed ; exit 1 )
- $(E) "[RUN] Testing round_robin_end2end_test"
- $(Q) $(BINDIR)/$(CONFIG)/round_robin_end2end_test || ( echo test round_robin_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing secure_auth_context_test"
$(Q) $(BINDIR)/$(CONFIG)/secure_auth_context_test || ( echo test secure_auth_context_test failed ; exit 1 )
$(E) "[RUN] Testing secure_sync_unary_ping_pong_test"
@@ -3125,6 +3125,7 @@ LIBGRPC_SRC = \
third_party/nanopb/pb_common.c \
third_party/nanopb/pb_decode.c \
third_party/nanopb/pb_encode.c \
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c \
@@ -3546,8 +3547,8 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/end2end/data/server1_key.c \
test/core/end2end/data/test_root_cert.c \
test/core/security/oauth2_utils.c \
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
test/core/end2end/cq_verifier.c \
- test/core/end2end/fake_resolver.c \
test/core/end2end/fixtures/http_proxy_fixture.c \
test/core/end2end/fixtures/proxy.c \
test/core/iomgr/endpoint_tests.c \
@@ -3756,8 +3757,8 @@ endif
LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
test/core/end2end/cq_verifier.c \
- test/core/end2end/fake_resolver.c \
test/core/end2end/fixtures/http_proxy_fixture.c \
test/core/end2end/fixtures/proxy.c \
test/core/iomgr/endpoint_tests.c \
@@ -3982,6 +3983,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
src/core/ext/filters/load_reporting/load_reporting.c \
src/core/ext/filters/load_reporting/load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
@@ -14074,6 +14076,49 @@ endif
endif
+CLIENT_LB_END2END_TEST_SRC = \
+ test/cpp/end2end/client_lb_end2end_test.cc \
+
+CLIENT_LB_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_LB_END2END_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/client_lb_end2end_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/client_lb_end2end_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/client_lb_end2end_test: $(PROTOBUF_DEP) $(CLIENT_LB_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(CLIENT_LB_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_lb_end2end_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_lb_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_client_lb_end2end_test: $(CLIENT_LB_END2END_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(CLIENT_LB_END2END_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CODEGEN_TEST_FULL_SRC = \
$(GENDIR)/src/proto/grpc/testing/control.pb.cc $(GENDIR)/src/proto/grpc/testing/control.grpc.pb.cc \
$(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \
@@ -15891,49 +15936,6 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/interop/reconnect_interop_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc
-ROUND_ROBIN_END2END_TEST_SRC = \
- test/cpp/end2end/round_robin_end2end_test.cc \
-
-ROUND_ROBIN_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ROUND_ROBIN_END2END_TEST_SRC))))
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL.
-
-$(BINDIR)/$(CONFIG)/round_robin_end2end_test: openssl_dep_error
-
-else
-
-
-
-
-ifeq ($(NO_PROTOBUF),true)
-
-# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
-
-$(BINDIR)/$(CONFIG)/round_robin_end2end_test: protobuf_dep_error
-
-else
-
-$(BINDIR)/$(CONFIG)/round_robin_end2end_test: $(PROTOBUF_DEP) $(ROUND_ROBIN_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(ROUND_ROBIN_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/round_robin_end2end_test
-
-endif
-
-endif
-
-$(OBJDIR)/$(CONFIG)/test/cpp/end2end/round_robin_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-
-deps_round_robin_end2end_test: $(ROUND_ROBIN_END2END_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(ROUND_ROBIN_END2END_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
SECURE_AUTH_CONTEXT_TEST_SRC = \
test/cpp/common/secure_auth_context_test.cc \
diff --git a/binding.gyp b/binding.gyp
index e617485134..130f7cc169 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -873,6 +873,7 @@
'third_party/nanopb/pb_common.c',
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
+ 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c',
diff --git a/build.yaml b/build.yaml
index d1e61256e6..c3d0d2364d 100644
--- a/build.yaml
+++ b/build.yaml
@@ -523,6 +523,7 @@ filegroups:
- grpc_base
- grpc_client_channel
- nanopb
+ - grpc_resolver_fake
- name: grpc_lb_policy_grpclb_secure
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
@@ -544,6 +545,7 @@ filegroups:
- grpc_secure
- grpc_client_channel
- nanopb
+ - grpc_resolver_fake
- name: grpc_lb_policy_pick_first
src:
- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -603,6 +605,15 @@ filegroups:
uses:
- grpc_base
- grpc_client_channel
+- name: grpc_resolver_fake
+ headers:
+ - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+ src:
+ - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+ plugin: grpc_resolver_fake
+ uses:
+ - grpc_base
+ - grpc_client_channel
- name: grpc_resolver_sockaddr
src:
- src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -673,8 +684,8 @@ filegroups:
- name: grpc_test_util_base
build: test
headers:
+ - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
- test/core/end2end/cq_verifier.h
- - test/core/end2end/fake_resolver.h
- test/core/end2end/fixtures/http_proxy_fixture.h
- test/core/end2end/fixtures/proxy.h
- test/core/iomgr/endpoint_tests.h
@@ -689,8 +700,8 @@ filegroups:
- test/core/util/slice_splitter.h
- test/core/util/trickle_endpoint.h
src:
+ - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
- test/core/end2end/cq_verifier.c
- - test/core/end2end/fake_resolver.c
- test/core/end2end/fixtures/http_proxy_fixture.c
- test/core/end2end/fixtures/proxy.c
- test/core/iomgr/endpoint_tests.c
@@ -1069,6 +1080,7 @@ libs:
- grpc_resolver_dns_ares
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
+ - grpc_resolver_fake
- grpc_load_reporting
- grpc_secure
- census
@@ -1168,6 +1180,7 @@ libs:
- grpc_resolver_dns_ares
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
+ - grpc_resolver_fake
- grpc_load_reporting
- grpc_lb_policy_grpclb
- grpc_lb_policy_pick_first
@@ -3625,6 +3638,22 @@ targets:
- grpc
- gpr_test_util
- gpr
+- name: client_lb_end2end_test
+ gtest: true
+ build: test
+ language: c++
+ src:
+ - test/cpp/end2end/client_lb_end2end_test.cc
+ deps:
+ - grpc++_test_util
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr_test_util
+ - gpr
+ excluded_poll_engines:
+ - poll
+ - poll-cv
- name: codegen_test_full
gtest: true
build: test
@@ -3903,6 +3932,9 @@ targets:
- grpc
- gpr_test_util
- gpr
+ excluded_poll_engines:
+ - poll
+ - poll-cv
- name: grpclb_test
gtest: false
build: test
@@ -3917,6 +3949,9 @@ targets:
- grpc
- gpr_test_util
- gpr
+ excluded_poll_engines:
+ - poll
+ - poll-cv
- name: health_service_end2end_test
gtest: true
build: test
@@ -4216,19 +4251,6 @@ targets:
- gpr_test_util
- gpr
- grpc++_test_config
-- name: round_robin_end2end_test
- gtest: true
- build: test
- language: c++
- src:
- - test/cpp/end2end/round_robin_end2end_test.cc
- deps:
- - grpc++_test_util
- - grpc_test_util
- - grpc++
- - grpc
- - gpr_test_util
- - gpr
- name: secure_auth_context_test
gtest: true
build: test
diff --git a/config.m4 b/config.m4
index 3b9cac1abe..6a5cc61a24 100644
--- a/config.m4
+++ b/config.m4
@@ -305,6 +305,7 @@ if test "$PHP_GRPC" != "no"; then
third_party/nanopb/pb_common.c \
third_party/nanopb/pb_decode.c \
third_party/nanopb/pb_encode.c \
+ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c \
src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c \
@@ -654,6 +655,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/c_ares)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/native)
+ PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/fake)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/sockaddr)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http)
diff --git a/config.w32 b/config.w32
index 7c407e848a..3fa30f1b53 100644
--- a/config.w32
+++ b/config.w32
@@ -282,6 +282,7 @@ if (PHP_GRPC != "no") {
"third_party\\nanopb\\pb_common.c " +
"third_party\\nanopb\\pb_decode.c " +
"third_party\\nanopb\\pb_encode.c " +
+ "src\\core\\ext\\filters\\client_channel\\resolver\\fake\\fake_resolver.c " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.c " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.c " +
"src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.c " +
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index e5e61df477..56cd8d2ee9 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -452,6 +452,7 @@ Pod::Spec.new do |s|
'third_party/nanopb/pb_common.h',
'third_party/nanopb/pb_decode.h',
'third_party/nanopb/pb_encode.h',
+ 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/load_reporting/load_reporting.h',
@@ -698,6 +699,7 @@ Pod::Spec.new do |s|
'third_party/nanopb/pb_common.c',
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
+ 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c',
@@ -938,6 +940,7 @@ Pod::Spec.new do |s|
'third_party/nanopb/pb_common.h',
'third_party/nanopb/pb_decode.h',
'third_party/nanopb/pb_encode.h',
+ 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h',
'src/core/ext/filters/load_reporting/load_reporting.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index b334efb0b5..9ec5589d8e 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -368,6 +368,7 @@ Gem::Specification.new do |s|
s.files += %w( third_party/nanopb/pb_common.h )
s.files += %w( third_party/nanopb/pb_decode.h )
s.files += %w( third_party/nanopb/pb_encode.h )
+ s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h )
s.files += %w( src/core/ext/filters/load_reporting/load_reporting.h )
@@ -614,6 +615,7 @@ Gem::Specification.new do |s|
s.files += %w( third_party/nanopb/pb_common.c )
s.files += %w( third_party/nanopb/pb_decode.c )
s.files += %w( third_party/nanopb/pb_encode.c )
+ s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c )
s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c )
diff --git a/include/grpc/support/workaround_list.h b/include/grpc/support/workaround_list.h
index ec4766510f..34011a7c9c 100644
--- a/include/grpc/support/workaround_list.h
+++ b/include/grpc/support/workaround_list.h
@@ -43,4 +43,4 @@ typedef enum {
GRPC_MAX_WORKAROUND_ID
} grpc_workaround_list;
-#endif
+#endif /* GRPC_SUPPORT_WORKAROUND_LIST_H */
diff --git a/package.xml b/package.xml
index 817a0345d6..d37b9030f9 100644
--- a/package.xml
+++ b/package.xml
@@ -382,6 +382,7 @@
<file baseinstalldir="/" name="third_party/nanopb/pb_common.h" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_decode.h" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_encode.h" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.h" role="src" />
@@ -628,6 +629,7 @@
<file baseinstalldir="/" name="third_party/nanopb/pb_common.c" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_decode.c" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_encode.c" role="src" />
+ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c" role="src" />
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index 04666edbec..aa464f3ab7 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -126,9 +126,10 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
} else {
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq), NULL,
- &w->on_complete, NULL);
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL,
+ &w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
@@ -245,7 +246,8 @@ void grpc_channel_watch_connectivity_state(
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
- &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
&w->on_complete, &w->watcher_timer_init);
} else {
abort();
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 8cebbe9eca..ab71467d73 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -389,7 +389,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand = arg;
char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
- grpc_lb_policy *old_lb_policy;
+ grpc_lb_policy *old_lb_policy = NULL;
grpc_slice_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
@@ -399,6 +399,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
+ bool lb_policy_updated = false;
if (chand->resolver_result != NULL) {
// Find LB policy name.
const grpc_arg *channel_arg =
@@ -438,14 +439,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "config_change");
- GRPC_ERROR_UNREF(state_error);
- state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
- &state_error);
+
+ const bool lb_policy_type_changed =
+ (chand->info_lb_policy_name == NULL) ||
+ (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
+ if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ // update
+ lb_policy_updated = true;
+ grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
+ } else {
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ GRPC_ERROR_UNREF(state_error);
+ state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
+ &state_error);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ }
}
+
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
@@ -492,8 +506,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
gpr_free(chand->info_lb_policy_name);
chand->info_lb_policy_name = lb_policy_name;
}
- old_lb_policy = chand->lb_policy;
- chand->lb_policy = lb_policy;
if (service_config_json != NULL) {
gpr_free(chand->info_service_config_json);
chand->info_service_config_json = service_config_json;
@@ -516,17 +528,21 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
"Channel disconnected", &error, 1));
grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
}
- if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ if (!lb_policy_updated && lb_policy != NULL &&
+ chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
exit_idle = true;
chand->exit_idle_when_lb_policy_arrives = false;
}
if (error == GRPC_ERROR_NONE && chand->resolver) {
- set_channel_connectivity_state_locked(
- exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
- if (lb_policy != NULL) {
- watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ if (!lb_policy_updated) {
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ GRPC_ERROR_REF(state_error),
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ }
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
@@ -546,7 +562,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
"resolver_gone");
}
- if (exit_idle) {
+ if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
@@ -555,9 +571,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_del_pollset_set(
exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ old_lb_policy = NULL;
}
- if (lb_policy != NULL) {
+ if (!lb_policy_updated && lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
@@ -1447,7 +1464,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
typedef struct external_connectivity_watcher {
channel_data *chand;
- grpc_pollset *pollset;
+ grpc_polling_entity pollent;
grpc_closure *on_complete;
grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
@@ -1522,8 +1539,8 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
- w->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w);
@@ -1550,8 +1567,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
}
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
- w->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
@@ -1559,18 +1576,18 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure,
- grpc_closure *watcher_timer_init) {
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_polling_entity pollent, grpc_connectivity_state *state,
+ grpc_closure *closure, grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
- w->pollset = pollset;
+ w->pollent = pollent;
w->on_complete = closure;
w->state = state;
w->watcher_timer_init = watcher_timer_init;
-
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
+ chand->interested_parties);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
grpc_closure_sched(
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 356a7ab0c1..4f8987f0da 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -57,9 +57,9 @@ int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element *elem);
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *on_complete,
- grpc_closure *watcher_timer_init);
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_polling_entity pollent, grpc_connectivity_state *state,
+ grpc_closure *on_complete, grpc_closure *watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index 112ba40658..7ac2cb1775 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -166,3 +166,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
return policy->vtable->check_connectivity_locked(exec_ctx, policy,
connectivity_error);
}
+
+void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ const grpc_lb_policy_args *lb_policy_args) {
+ policy->vtable->update_locked(exec_ctx, policy, lb_policy_args);
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index fb4aa084a6..07e7953009 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -42,6 +42,7 @@
is expected to be extended to contain some parameters) */
typedef struct grpc_lb_policy grpc_lb_policy;
typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
+typedef struct grpc_lb_policy_args grpc_lb_policy_args;
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
@@ -105,9 +106,12 @@ struct grpc_lb_policy_vtable {
grpc_lb_policy *policy,
grpc_connectivity_state *state,
grpc_closure *closure);
+
+ void (*update_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args);
};
-/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
+//#define GRPC_LB_POLICY_REFCOUNT_DEBUG
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
/* Strong references: the policy will shutdown when they reach zero */
@@ -207,4 +211,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error);
+/** Update \a policy with \a lb_policy_args. */
+void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ const grpc_lb_policy_args *lb_policy_args);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index d2a2856a18..6eb0a9ef21 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -115,6 +115,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -315,6 +316,9 @@ typedef struct glb_lb_policy {
/** for communicating with the LB server */
grpc_channel *lb_channel;
+ /** response generator to inject address updates into \a lb_channel */
+ grpc_fake_resolver_response_generator *response_generator;
+
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy *rr_policy;
@@ -323,6 +327,9 @@ typedef struct glb_lb_policy {
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
+ /** connectivity state of the LB channel */
+ grpc_connectivity_state lb_channel_connectivity;
+
/** stores the deserialized response from the LB. May be NULL until one such
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
@@ -340,10 +347,27 @@ typedef struct glb_lb_policy {
bool shutting_down;
+ /** are we currently updating lb_call? */
+ bool updating_lb_call;
+
+ /** are we currently updating lb_channel? */
+ bool updating_lb_channel;
+
+ /** are we already watching the LB channel's connectivity? */
+ bool watching_lb_channel;
+
+ /** is \a lb_call_retry_timer active? */
+ bool retry_timer_active;
+
+ /** called upon changes to the LB channel's connectivity. */
+ grpc_closure lb_channel_on_connectivity_changed;
+
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
+
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
-
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request;
@@ -533,10 +557,9 @@ static grpc_lb_addresses *process_serverlist_locked(
return lb_addresses;
}
-/* returns true if the new RR policy should replace the current one, if any */
-static bool update_lb_connectivity_status_locked(
+static void update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
- grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
+ grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
@@ -570,28 +593,26 @@ static bool update_lb_connectivity_status_locked(
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (new_rr_state) {
+ switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
- GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
- return false; /* don't replace the RR policy */
+ GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ break;
case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
- GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
+ GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Setting grpclb's state to %s from new RR policy %p state.",
- grpc_connectivity_state_name(new_rr_state),
- (void *)glb_policy->rr_policy);
+ gpr_log(
+ GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
+ grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
}
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
+ GRPC_ERROR_REF(rr_state_error),
"update_lb_connectivity_status_locked");
- return true;
}
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
@@ -670,45 +691,38 @@ static bool pick_from_internal_rr_locked(
return pick_done;
}
-static grpc_lb_policy *create_rr_locked(
- grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
-
- grpc_lb_policy_args args;
- memset(&args, 0, sizeof(args));
- args.client_channel_factory = glb_policy->cc_factory;
- args.combiner = glb_policy->base.combiner;
+static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args));
+ args->client_channel_factory = glb_policy->cc_factory;
+ args->combiner = glb_policy->base.combiner;
grpc_lb_addresses *addresses =
- process_serverlist_locked(exec_ctx, serverlist);
-
+ process_serverlist_locked(exec_ctx, glb_policy->serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
- args.args = grpc_channel_args_copy_and_add_and_remove(
+ args->args = grpc_channel_args_copy_and_add_and_remove(
glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
1);
-
- grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- GPR_ASSERT(rr != NULL);
grpc_lb_addresses_destroy(exec_ctx, addresses);
- grpc_channel_args_destroy(exec_ctx, args.args);
- return rr;
+ return args;
+}
+
+static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_args *args) {
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ gpr_free(args);
}
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
-/* glb_policy->rr_policy may be NULL (initial handover) */
-static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
- GPR_ASSERT(glb_policy->serverlist != NULL &&
- glb_policy->serverlist->num_servers > 0);
-
- if (glb_policy->shutting_down) return;
+static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(glb_policy->rr_policy == NULL);
grpc_lb_policy *new_rr_policy =
- create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
+ grpc_lb_policy_create(exec_ctx, "round_robin", args);
if (new_rr_policy == NULL) {
gpr_log(GPR_ERROR,
"Failure creating a RoundRobin policy for serverlist update with "
@@ -719,41 +733,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy->rr_policy);
return;
}
-
- grpc_error *new_rr_state_error = NULL;
- const grpc_connectivity_state new_rr_state =
- grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
- &new_rr_state_error);
- /* Connectivity state is a function of the new RR policy just created */
- const bool replace_old_rr = update_lb_connectivity_status_locked(
- exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
-
- if (!replace_old_rr) {
- /* dispose of the new RR policy that won't be used after all */
- GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO,
- "Keeping old RR policy (%p) despite new serverlist: new RR "
- "policy was in %s connectivity state.",
- (void *)glb_policy->rr_policy,
- grpc_connectivity_state_name(new_rr_state));
- }
- return;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
- (void *)new_rr_policy, (void *)glb_policy->rr_policy);
- }
-
- if (glb_policy->rr_policy != NULL) {
- /* if we are phasing out an existing RR instance, unref it. */
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
- }
-
- /* Finally update the RR policy to the newly created one */
glb_policy->rr_policy = new_rr_policy;
+ grpc_error *rr_state_error = NULL;
+ const grpc_connectivity_state rr_state =
+ grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_state_error);
+ /* Connectivity state is a function of the RR policy updated/created */
+ update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
+ rr_state_error);
+
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
@@ -769,10 +758,10 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = new_rr_state;
+ rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
@@ -809,6 +798,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
}
}
+/* glb_policy->rr_policy may be NULL (initial handover) */
+static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
+
+ if (glb_policy->shutting_down) return;
+
+ grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
+ if (glb_policy->rr_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
+ } else {
+ create_rr_locked(exec_ctx, glb_policy, args);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
+ (void *)glb_policy->rr_policy);
+ }
+ }
+ lb_policy_args_destroy(exec_ctx, args);
+}
+
static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
@@ -854,18 +868,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create(
return entry;
}
-/* Returns the target URI for the LB service whose addresses are in \a
- * addresses. Using this URI, a bidirectional streaming channel will be created
- * for the reception of load balancing updates.
+static int balancer_name_cmp_fn(void *a, void *b) {
+ const char *a_str = a;
+ const char *b_str = b;
+ return strcmp(a_str, b_str);
+}
+
+/* Returns the channel args for the LB channel, used to create a bidirectional
+ * stream for the reception of load balancing updates.
*
- * The output argument \a targets_info will be updated to contain a mapping of
- * "LB server address" to "balancer name", as reported by the naming system.
- * This mapping will be propagated via the channel arguments of the
- * aforementioned LB streaming channel, to be used by the security connector for
- * secure naming checks. The user is responsible for freeing \a targets_info. */
-static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
- const grpc_lb_addresses *addresses,
- grpc_slice_hash_table **targets_info) {
+ * Inputs:
+ * - \a addresses: corresponding to the balancers.
+ * - \a response_generator: in order to propagate updates from the resolver
+ * above the grpclb policy.
+ * - \a args: other args inherited from the grpclb policy. */
+static grpc_channel_args *build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
@@ -874,53 +894,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
* It's the resolver's responsibility to make sure this policy is only
* instantiated and used in that case. Otherwise, something has gone wrong. */
GPR_ASSERT(num_grpclb_addrs > 0);
-
+ grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries =
- gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
+ gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
- /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a
- * addresses */
- /* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
- size_t addr_index = 0;
-
- for (size_t i = 0; i < addresses->num_addresses; i++) {
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
- if (addresses->addresses[i].is_balancer) {
- char *addr_str;
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_str, &addresses->addresses[i].address, true) > 0);
- targets_info_entries[addr_index] = targets_info_entry_create(
- addr_str, addresses->addresses[i].balancer_name);
- addr_strs[addr_index++] = addr_str;
- }
+ char *addr_str;
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
+ targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
+ addr_str, addresses->addresses[i].balancer_name);
+ gpr_free(addr_str);
+
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, NULL /* user data */);
}
- GPR_ASSERT(addr_index == num_grpclb_addrs);
-
- size_t uri_path_len;
- char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs,
- ",", &uri_path_len);
- for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]);
- gpr_free(addr_strs);
-
- char *target_uri_str = NULL;
- /* TODO(dgq): Don't assume all addresses will share the scheme of the first
- * one */
- gpr_asprintf(&target_uri_str, "%s:%s",
- grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address),
- uri_path);
- gpr_free(uri_path);
-
- *targets_info = grpc_slice_hash_table_create(
- num_grpclb_addrs, targets_info_entries, destroy_balancer_name);
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ grpc_slice_hash_table *targets_info =
+ grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
+ destroy_balancer_name, balancer_name_cmp_fn);
gpr_free(targets_info_entries);
- return target_uri_str;
+ grpc_channel_args *lb_channel_args =
+ grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
+ response_generator, args);
+
+ grpc_arg lb_channel_addresses_arg =
+ grpc_lb_addresses_create_channel_arg(lb_addresses);
+
+ grpc_channel_args *result = grpc_channel_args_copy_and_add(
+ lb_channel_args, &lb_channel_addresses_arg, 1);
+ grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
+ return result;
}
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error);
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
@@ -976,24 +997,31 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
- grpc_slice_hash_table *targets_info = NULL;
/* Create a client channel over them to communicate with a LB service */
- char *lb_service_target_addresses =
- get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info);
- grpc_channel_args *lb_channel_args =
- get_lb_channel_args(exec_ctx, targets_info, args->args);
+ glb_policy->response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ char *uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, lb_service_target_addresses, args->client_channel_factory,
- lb_channel_args);
- grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+
+ /* Propagate initial resolution */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- gpr_free(lb_service_target_addresses);
+ gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
+
+ grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed,
+ glb_lb_channel_on_connectivity_changed_cb, glb_policy,
+ grpc_combiner_scheduler(args->combiner, false));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
@@ -1009,12 +1037,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
- grpc_channel_destroy(glb_policy->lb_channel);
- glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
+ grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
gpr_free(glb_policy);
}
@@ -1022,16 +1053,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
glb_policy->shutting_down = true;
- pending_pick *pp = glb_policy->pending_picks;
- glb_policy->pending_picks = NULL;
- pending_ping *pping = glb_policy->pending_pings;
- glb_policy->pending_pings = NULL;
- if (glb_policy->rr_policy) {
- GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
- }
- grpc_connectivity_state_set(
- exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
@@ -1045,6 +1066,30 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_call_cancel(lb_call, NULL);
/* lb_on_server_status_received will pick up the cancel and clean up */
}
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+
+ pending_pick *pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = NULL;
+ pending_ping *pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = NULL;
+ if (glb_policy->rr_policy) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+ }
+ // We destroy the LB channel here because
+ // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
+ // instance. Destroying the lb channel in glb_destroy would likely result in
+ // a callback invocation without a valid glb_policy arg.
+ if (glb_policy->lb_channel != NULL) {
+ grpc_channel_destroy(glb_policy->lb_channel);
+ glb_policy->lb_channel = NULL;
+ }
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
@@ -1318,6 +1363,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
+ GPR_ASSERT(glb_policy->lb_call == NULL);
GPR_ASSERT(!glb_policy->shutting_down);
/* Note the following LB call progresses every time there's activity in \a
@@ -1403,8 +1449,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
lb_call_init_locked(exec_ctx, glb_policy);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
- (void *)glb_policy, (void *)glb_policy->lb_call);
+ gpr_log(GPR_INFO,
+ "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
+ (void *)glb_policy, (void *)glb_policy->lb_channel,
+ (void *)glb_policy->lb_call);
}
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1608,8 +1656,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
- if (!glb_policy->shutting_down) {
+ glb_policy->retry_timer_active = false;
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
(void *)glb_policy);
@@ -1617,31 +1665,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "grpclb_on_retry_timer");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
}
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg;
-
GPR_ASSERT(glb_policy->lb_call != NULL);
-
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
char *status_details =
grpc_slice_to_c_string(glb_policy->lb_call_status_details);
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"Status from LB server received. Status = %d, Details = '%s', "
- "(call: %p)",
+ "(call: %p), error %p",
glb_policy->lb_call_status, status_details,
- (void *)glb_policy->lb_call);
+ (void *)glb_policy->lb_call, (void *)error);
gpr_free(status_details);
}
-
/* We need to perform cleanups no matter what. */
lb_call_destroy_locked(exec_ctx, glb_policy);
-
- if (!glb_policy->shutting_down) {
+ if (glb_policy->started_picking && glb_policy->updating_lb_call) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ }
+ if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ glb_policy->updating_lb_call = false;
+ } else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try =
@@ -1651,16 +1700,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
(void *)glb_policy);
gpr_timespec timeout = gpr_time_sub(next_try, now);
if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
- gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
+ gpr_log(GPR_DEBUG,
+ "... retry_timer_active in %" PRId64 ".%09d seconds.",
timeout.tv_sec, timeout.tv_nsec);
} else {
- gpr_log(GPR_DEBUG, "... retrying immediately.");
+ gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
grpc_closure_init(
&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ glb_policy->retry_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
}
@@ -1668,6 +1719,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
"lb_on_server_status_received");
}
+static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
+
+ if (glb_policy->updating_lb_channel) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for grpclb %p. Deferring update.",
+ (void *)glb_policy);
+ }
+ if (glb_policy->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx,
+ glb_policy->pending_update_args->args);
+ gpr_free(glb_policy->pending_update_args);
+ }
+ glb_policy->pending_update_args =
+ gpr_zalloc(sizeof(*glb_policy->pending_update_args));
+ glb_policy->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
+ glb_policy->pending_update_args->combiner = args->combiner;
+ return;
+ }
+
+ glb_policy->updating_lb_channel = true;
+ // Propagate update to lb_channel (pick first).
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (glb_policy->lb_channel == NULL) {
+ // If we don't have a current channel to the LB, go into TRANSIENT
+ // FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "glb_update_missing");
+ } else {
+ // otherwise, keep using the current LB channel (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for grpclb %p update, "
+ "ignoring.",
+ (void *)glb_policy);
+ }
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ GPR_ASSERT(glb_policy->lb_channel != NULL);
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ /* Propagate updates to the LB channel through the fake resolver */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+
+ if (!glb_policy->watching_lb_channel) {
+ // Watch the LB channel connectivity for connection.
+ glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ glb_policy->watching_lb_channel = true;
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ }
+}
+
+// Invoked as part of the update process. It continues watching the LB channel
+// until it shuts down or becomes READY. It's invoked even if the LB channel
+// stayed READY throughout the update (for example if the update is identical).
+static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ glb_lb_policy *glb_policy = arg;
+ if (glb_policy->shutting_down) goto done;
+ // Re-initialize the lb_call. This should also take care of updating the
+ // embedded RR policy. Note that the current RR policy, if any, will stay in
+ // effect until an update from the new lb_call is received.
+ switch (glb_policy->lb_channel_connectivity) {
+ case GRPC_CHANNEL_INIT:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ /* resub. */
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ grpc_client_channel_watch_connectivity_state(
+ exec_ctx, client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ glb_policy->base.interested_parties),
+ &glb_policy->lb_channel_connectivity,
+ &glb_policy->lb_channel_on_connectivity_changed, NULL);
+ break;
+ }
+ case GRPC_CHANNEL_IDLE:
+ // lb channel inactive (probably shutdown prior to update). Restart lb
+ // call to kick the lb channel into gear.
+ GPR_ASSERT(glb_policy->lb_call == NULL);
+ /* fallthrough */
+ case GRPC_CHANNEL_READY:
+ if (glb_policy->lb_call != NULL) {
+ glb_policy->updating_lb_channel = false;
+ glb_policy->updating_lb_call = true;
+ grpc_call_cancel(glb_policy->lb_call, NULL);
+ // lb_on_server_status_received will pick up the cancel and reinit
+ // lb_call.
+ if (glb_policy->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = glb_policy->pending_update_args;
+ glb_policy->pending_update_args = NULL;
+ glb_update_locked(exec_ctx, &glb_policy->base, args);
+ }
+ } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ glb_policy->retry_timer_active = false;
+ }
+ start_picking_locked(exec_ctx, glb_policy);
+ }
+ /* fallthrough */
+ case GRPC_CHANNEL_SHUTDOWN:
+ done:
+ glb_policy->watching_lb_channel = false;
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "watch_lb_channel_connectivity_cb_shutdown");
+ break;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1678,7 +1861,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
- glb_notify_on_state_change_locked};
+ glb_notify_on_state_change_locked,
+ glb_update_locked};
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
index d6201f2387..a190c2075d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
@@ -50,28 +50,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
- return grpc_channel_args_copy_and_remove(args, keys_to_remove,
- GPR_ARRAY_SIZE(keys_to_remove));
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
+ return grpc_channel_args_copy_and_add_and_remove(
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
index 9730c971d9..dfb682ad16 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/slice/slice_hash_table.h"
/** Create the channel used for communicating with an LB service.
@@ -49,9 +50,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
grpc_client_channel_factory *client_channel_factory,
grpc_channel_args *args);
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args);
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
index a145cba63c..21effd75e3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
@@ -76,32 +76,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
return lb_channel;
}
-grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
- grpc_slice_hash_table *targets_info,
- const grpc_channel_args *args) {
- const grpc_arg targets_info_arg =
- grpc_lb_targets_info_create_channel_arg(targets_info);
- /* We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
+grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args(
+ grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info,
+ grpc_fake_resolver_response_generator *response_generator,
+ const grpc_channel_args *args) {
+ const grpc_arg to_add[] = {
+ grpc_lb_targets_info_create_channel_arg(targets_info),
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+ /* We remove:
*
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
+ * - The channel arg for the LB policy name, since we want to use the default
+ * (pick_first) in this case.
*
- * Lastly, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel (the client channel factory will re-add this arg with
- * the right value). */
+ * - The channel arg for the resolved addresses, since that will be generated
+ * by the name resolver used in the LB channel. Note that the LB channel
+ * will use the fake resolver, so this won't actually generate a query
+ * to DNS (or some other name service). However, the addresses returned by
+ * the fake resolver will have is_balancer=false, whereas our own
+ * addresses have is_balancer=true. We need the LB channel to return
+ * addresses with is_balancer=false so that it does not wind up recursively
+ * using the grpclb LB policy, as per the special case logic in
+ * client_channel.c.
+ *
+ * - The channel arg for the server URI, since that will be different for the
+ * LB channel than for the parent channel (the client channel factory will
+ * re-add this arg with the right value).
+ *
+ * - The fake resolver generator, because we are replacing it with the one
+ * from the grpclb policy, used to propagate updates to the LB channel. */
static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI,
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR};
/* Add the targets info table to be used for secure naming */
return grpc_channel_args_copy_and_add_and_remove(
- args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg,
- 1);
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add,
+ GPR_ARRAY_SIZE(to_add));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index b1c5dfc61c..b6f86255df 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -37,11 +37,14 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
+grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
+
typedef struct pending_pick {
struct pending_pick *next;
uint32_t initial_metadata_flags;
@@ -54,7 +57,9 @@ typedef struct {
grpc_lb_policy base;
/** all our subchannels */
grpc_subchannel **subchannels;
+ grpc_subchannel **new_subchannels;
size_t num_subchannels;
+ size_t num_new_subchannels;
grpc_closure connectivity_changed;
@@ -63,10 +68,19 @@ typedef struct {
/** the selected channel */
grpc_connected_subchannel *selected;
+ /** the subchannel key for \a selected, or NULL if \a selected not set */
+ const grpc_subchannel_key *selected_key;
+
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shut down? */
- int shutdown;
+ bool shutdown;
+ /** are we updating the selected subchannel? */
+ bool updating_selected;
+ /** are we updating the subchannel candidates? */
+ bool updating_subchannels;
+ /** args from the latest update received while already updating, or NULL */
+ grpc_lb_policy_args *pending_update_args;
/** which subchannel are we watching? */
size_t checking_subchannel;
/** what is the connectivity of that channel? */
@@ -80,23 +94,28 @@ typedef struct {
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
GPR_ASSERT(p->pending_picks == NULL);
- for (i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
}
if (p->selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
gpr_free(p->subchannels);
+ gpr_free(p->new_subchannels);
gpr_free(p);
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- p->shutdown = 1;
+ p->shutdown = true;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(
@@ -106,7 +125,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
- } else if (p->num_subchannels > 0) {
+ } else if (p->num_subchannels > 0 && p->started_picking) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
@@ -169,21 +188,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_UNREF(error);
}
-static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
- p->started_picking = 1;
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+static void start_picking_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ p->started_picking = true;
+ if (p->subchannels != NULL) {
+ GPR_ASSERT(p->num_subchannels > 0);
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
}
static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
}
@@ -203,7 +226,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* No subchannel selected yet, so try again */
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -216,30 +239,290 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
- size_t i;
size_t num_subchannels = p->num_subchannels;
- grpc_subchannel **subchannels;
+ grpc_subchannel **subchannels = p->subchannels;
- subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
- for (i = 0; i < num_subchannels; i++) {
+ for (size_t i = 0; i < num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
-
gpr_free(subchannels);
}
+static grpc_connectivity_state pf_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
+}
+
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
+ current, notify);
+}
+
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ } else {
+ grpc_closure_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
+ }
+}
+
+/* unsubscribe all subchannels */
+static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ if (p->num_subchannels > 0) {
+ GPR_ASSERT(p->selected == NULL);
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ &p->connectivity_changed);
+ p->updating_subchannels = true;
+ } else if (p->selected != NULL) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ p->updating_selected = true;
+ }
+}
+
+/* true upon success */
+static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ if (p->subchannels == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "pf_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Pick First %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
+ }
+ const grpc_lb_addresses *addresses = arg->value.pointer.p;
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (!addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) {
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "pf_update_empty");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
+ (void *)p, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
+ /* We remove the following keys in order for subchannel keys belonging to
+ * subchannels point to the same address to match. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ size_t sc_args_count = 0;
+
+ /* Create list of subchannel args for new addresses in \a args. */
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (addresses->addresses[i].is_balancer) continue;
+ if (addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
+ 1);
+ gpr_free(addr_arg.value.string);
+ sc_args[sc_args_count++].args = new_args;
+ }
+
+ /* Check if p->selected is amongst them. If so, we are done. */
+ if (p->selected != NULL) {
+ GPR_ASSERT(p->selected_key != NULL);
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
+ const bool found_selected =
+ grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
+ grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
+ if (found_selected) {
+ // The currently selected subchannel is in the update: we are done.
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p found already selected subchannel %p amongst "
+ "updates. Update done.",
+ (void *)p, (void *)p->selected);
+ }
+ for (size_t j = 0; j < sc_args_count; j++) {
+ grpc_channel_args_destroy(exec_ctx,
+ (grpc_channel_args *)sc_args[j].args);
+ }
+ gpr_free(sc_args);
+ return;
+ }
+ }
+ }
+ // We only check for already running updates here because if the previous
+ // steps were successful, the update can be considered done without any
+ // interference (ie, no callbacks were scheduled).
+ if (p->updating_selected || p->updating_subchannels) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Update already in progress for pick first %p. Deferring update.",
+ (void *)p);
+ }
+ if (p->pending_update_args != NULL) {
+ grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
+ gpr_free(p->pending_update_args);
+ }
+ p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args));
+ p->pending_update_args->client_channel_factory =
+ args->client_channel_factory;
+ p->pending_update_args->args = grpc_channel_args_copy(args->args);
+ p->pending_update_args->combiner = args->combiner;
+ return;
+ }
+ /* Create the subchannels for the new subchannel args/addresses. */
+ grpc_subchannel **new_subchannels =
+ gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
+ size_t num_new_subchannels = 0;
+ for (size_t i = 0; i < sc_args_count; i++) {
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args[i]);
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_INFO,
+ "Pick First %p created subchannel %p for address uri %s",
+ (void *)p, (void *)subchannel, address_uri);
+ gpr_free(address_uri);
+ }
+ grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
+ if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
+ }
+ gpr_free(sc_args);
+ if (num_new_subchannels == 0) {
+ gpr_free(new_subchannels);
+ // Empty update. Unsubscribe from all current subchannels and put the
+ // channel in TRANSIENT_FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
+ "pf_update_no_valid_addresses");
+ stop_connectivity_watchers(exec_ctx, p);
+ return;
+ }
+
+ /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
+ stop_connectivity_watchers(exec_ctx, p);
+
+ /* Save new subchannels. The switch over will happen in
+ * pf_connectivity_changed_locked */
+ if (p->updating_selected || p->updating_subchannels) {
+ p->num_new_subchannels = num_new_subchannels;
+ p->new_subchannels = new_subchannels;
+ } else { /* nothing is updating. Get things moving from here */
+ p->num_subchannels = num_new_subchannels;
+ p->subchannels = new_subchannels;
+ p->new_subchannels = NULL;
+ p->num_new_subchannels = 0;
+ if (p->started_picking) {
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ }
+}
+
static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
+ bool restart = false;
+ if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
+ /* Captured the unsubscription for p->selected */
+ GPR_ASSERT(p->selected != NULL);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
+ "pf_update_connectivity");
+ p->updating_selected = false;
+ if (p->num_new_subchannels == 0) {
+ p->selected = NULL;
+ return;
+ }
+ restart = true;
+ }
+ if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
+ /* Captured the unsubscription for the checking subchannel */
+ GPR_ASSERT(p->selected == NULL);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
+ "pf_update_connectivity");
+ }
+ gpr_free(p->subchannels);
+ p->subchannels = NULL;
+ p->num_subchannels = 0;
+ p->updating_subchannels = false;
+ if (p->num_new_subchannels == 0) return;
+ restart = true;
+ }
+ if (restart) {
+ p->selected = NULL;
+ p->selected_key = NULL;
+
+ GPR_ASSERT(p->new_subchannels != NULL);
+ GPR_ASSERT(p->num_new_subchannels > 0);
+ p->num_subchannels = p->num_new_subchannels;
+ p->subchannels = p->new_subchannels;
+ p->num_new_subchannels = 0;
+ p->new_subchannels = NULL;
+
+ if (p->started_picking) {
+ /* If we were picking, continue to do so over the new subchannels,
+ * starting from the 0th index. */
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ /* reuses the weak ref from start_picking_locked */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel],
+ p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
+ }
+ if (p->pending_update_args != NULL) {
+ const grpc_lb_policy_args *args = p->pending_update_args;
+ p->pending_update_args = NULL;
+ pf_update_locked(exec_ctx, &p->base, args);
+ }
+ return;
+ }
GRPC_ERROR_REF(error);
-
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
@@ -272,6 +555,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected_subchannel),
"picked_first");
+
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
+ }
+ p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
destroy_subchannels_locked(exec_ctx, p);
@@ -279,6 +567,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Servicing pending pick with selected subchannel %p",
+ (void *)p->selected);
+ }
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
@@ -353,32 +646,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- return grpc_connectivity_state_get(&p->state_tracker, error);
-}
-
-static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
- current, notify);
-}
-
-static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
- } else {
- grpc_closure_sched(exec_ctx, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
- }
-}
-
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -388,7 +655,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_ping_one_locked,
pf_exit_idle_locked,
pf_check_connectivity_locked,
- pf_notify_on_state_change_locked};
+ pf_notify_on_state_change_locked,
+ pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -398,59 +666,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
- }
- grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- if (num_addrs == 0) return NULL;
-
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs);
- grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- /* Skip balancer addresses, since we only know how to handle backends. */
- if (addresses->addresses[i].is_balancer) continue;
-
- if (addresses->addresses[i].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
-
- if (subchannel != NULL) {
- p->subchannels[subchannel_idx++] = subchannel;
- }
- }
- if (subchannel_idx == 0) {
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
- }
- p->num_subchannels = subchannel_idx;
-
+ pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner, false));
@@ -472,6 +689,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() {
void grpc_lb_policy_pick_first_init() {
grpc_register_lb_policy(pick_first_lb_factory_create());
+ grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
}
void grpc_lb_policy_pick_first_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 7ee6ffb787..c9758eef88 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -33,31 +33,11 @@
/** Round Robin Policy.
*
- * This policy keeps:
- * - A circular list of ready (connected) subchannels, the *readylist*. An empty
- * readylist consists solely of its root (dummy) node.
- * - A pointer to the last element picked from the readylist, the *lastpick*.
- * Initially set to point to the readylist's root.
- *
- * Behavior:
- * - When a subchannel connects, it's *prepended* to the readylist's root node.
- * Ie, if readylist = A <-> B <-> ROOT <-> C
- * ^ ^
- * |____________________|
- * and subchannel D becomes connected, the addition of D to the readylist
- * results in readylist = A <-> B <-> D <-> ROOT <-> C
- * ^ ^
- * |__________________________|
- * - When a subchannel disconnects, it's removed from the readylist. If the
- * subchannel being removed was the most recently picked, the *lastpick*
- * pointer moves to the removed node's previous element. Note that if the
- * readylist only had one element, this is still legal, as the lastpick would
- * point to the dummy root node, for an empty readylist.
- * - Upon picking, *lastpick* is updated to point to the returned (connected)
- * subchannel. Note that it's possible that the selected subchannel becomes
- * disconnected in the interim between the selection and the actual usage of
- * the subchannel by the caller.
- */
+ * Before every pick, the \a get_next_ready_subchannel_index_locked function
+ * returns the p->subchannel_list->subchannels index for next subchannel,
+ * respecting the relative
+ * order of the addresses provided upon creation or updates. Note however that
+ * updates will start picking from the beginning of the updated list. */
#include <string.h>
@@ -72,8 +52,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct round_robin_lb_policy round_robin_lb_policy;
-
grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false);
/** List of entities waiting for a pick.
@@ -99,9 +77,37 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
+typedef struct rr_subchannel_list rr_subchannel_list;
+typedef struct round_robin_lb_policy {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+
+ rr_subchannel_list *subchannel_list;
+
+ /** have we started picking? */
+ bool started_picking;
+ /** are we shutting down? */
+ bool shutdown;
+ /** List of picks that are waiting on connectivity */
+ pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** Index into subchannels for last pick. */
+ size_t last_ready_subchannel_index;
+
+ /** Latest version of the subchannel list.
+ * Subchannel connectivity callbacks will only promote updated subchannel
+ * lists if they equal \a latest_pending_subchannel_list. In other words,
+ * racing callbacks that reference outdated subchannel lists won't perform any
+ * update. */
+ rr_subchannel_list *latest_pending_subchannel_list;
+} round_robin_lb_policy;
+
typedef struct {
- /** backpointer to owning policy */
- round_robin_lb_policy *policy;
+ /** backpointer to owning subchannel list */
+ rr_subchannel_list *subchannel_list;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
@@ -123,12 +129,9 @@ typedef struct {
const grpc_lb_user_data_vtable *user_data_vtable;
} subchannel_data;
-struct round_robin_lb_policy {
- /** base policy: must be first */
- grpc_lb_policy base;
-
- /** total number of addresses received at creation time */
- size_t num_addresses;
+struct rr_subchannel_list {
+ /** backpointer to owning policy */
+ round_robin_lb_policy *policy;
/** all our subchannels */
size_t num_subchannels;
@@ -141,67 +144,143 @@ struct round_robin_lb_policy {
/** how many subchannels are in state IDLE */
size_t num_idle;
- /** have we started picking? */
- bool started_picking;
- /** are we shutting down? */
- bool shutdown;
- /** List of picks that are waiting on connectivity */
- pending_pick *pending_picks;
-
- /** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
+ /** There will be one ref for each entry in subchannels for which there is a
+ * pending connectivity state watcher callback. */
+ gpr_refcount refcount;
- // Index into subchannels for last pick.
- size_t last_ready_subchannel_index;
+ /** Is this list shutting down? This may be true due to the shutdown of the
+ * policy itself or because a newer update has arrived while this one hadn't
+ * finished processing. */
+ bool shutting_down;
};
-/** Returns the index into p->subchannels of the next subchannel in
- * READY state, or p->num_subchannels if no subchannel is READY.
+static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list) {
+ GPR_ASSERT(subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
+ (void *)subchannel_list->policy, (void *)subchannel_list);
+ }
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
+ "rr_subchannel_list_destroy");
+ }
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ }
+ gpr_free(subchannel_list->subchannels);
+ gpr_free(subchannel_list);
+}
+
+static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ gpr_ref_non_zero(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count - 1), (unsigned long)count);
+ }
+}
+
+static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ const bool done = gpr_unref(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu",
+ (void *)subchannel_list->policy, (void *)subchannel_list,
+ (unsigned long)(count + 1), (unsigned long)count);
+ }
+ if (done) {
+ rr_subchannel_list_destroy(exec_ctx, subchannel_list);
+ }
+}
+
+/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
+ * watcher's callback will ultimately unref \a subchannel_list. */
+static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
+ rr_subchannel_list *subchannel_list,
+ const char *reason) {
+ GPR_ASSERT(!subchannel_list->shutting_down);
+ subchannel_list->shutting_down = true;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
+ }
+ rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
+
+/** Returns the index into p->subchannel_list->subchannels of the next
+ * subchannel in READY state, or p->subchannel_list->num_subchannels if no
+ * subchannel is READY.
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
static size_t get_next_ready_subchannel_index_locked(
const round_robin_lb_policy *p) {
+ GPR_ASSERT(p->subchannel_list != NULL);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
- "[RR: %p] getting next ready subchannel, "
+ "[RR %p] getting next ready subchannel (out of %lu), "
"last_ready_subchannel_index=%lu",
- p, (unsigned long)p->last_ready_subchannel_index);
+ (void *)p, (unsigned long)p->subchannel_list->num_subchannels,
+ (unsigned long)p->last_ready_subchannel_index);
}
- for (size_t i = 0; i < p->num_subchannels; ++i) {
- const size_t index =
- (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ const size_t index = (i + p->last_ready_subchannel_index + 1) %
+ p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
- (unsigned long)index,
- p->subchannels[index].curr_connectivity_state);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%d",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ p->subchannel_list->subchannels[index].curr_connectivity_state);
}
- if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
+ GRPC_CHANNEL_READY) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
- p, (unsigned long)index);
+ gpr_log(GPR_DEBUG,
+ "[RR %p] found next ready subchannel (%p) at index %lu of "
+ "subchannel_list %p",
+ (void *)p,
+ (void *)p->subchannel_list->subchannels[index].subchannel,
+ (unsigned long)index, (void *)p->subchannel_list);
}
return index;
}
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p);
}
- return p->num_subchannels;
+ return p->subchannel_list->num_subchannels;
}
// Sets p->last_ready_subchannel_index to last_ready_index.
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < p->num_subchannels);
+ GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
- (void *)p, (unsigned long)last_ready_index,
- (void *)p->subchannels[last_ready_index].subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->subchannels[last_ready_index].subchannel));
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannel_list->subchannels[last_ready_index].subchannel));
}
}
@@ -210,18 +289,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- }
- }
- }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
- gpr_free(p->subchannels);
gpr_free(p);
}
@@ -243,14 +311,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
- NULL,
- &sd->connectivity_changed_closure);
- }
- }
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_shutdown");
+ p->subchannel_list = NULL;
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -304,15 +367,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
p->started_picking = true;
- for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
- }
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannel_list->subchannels[i];
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ rr_subchannel_list_ref(sd->subchannel_list, "start_picking");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
@@ -332,63 +394,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- /* readily available, report right away */
- subchannel_data *sd = &p->subchannels[next_ready_index];
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
- if (user_data != NULL) {
- *user_data = sd->user_data;
- }
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
- (void *)*target, (unsigned long)next_ready_index);
- }
- /* only advance the last picked pointer if the selection was used */
- update_last_ready_subchannel_index_locked(p, next_ready_index);
- return 1;
- } else {
- /* no pick currently available. Save for later in list of pending picks */
- if (!p->started_picking) {
- start_picking_locked(exec_ctx, p);
+ if (p->subchannel_list != NULL) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ /* readily available, report right away */
+ subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index];
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "rr_picked");
+ if (user_data != NULL) {
+ *user_data = sd->user_data;
+ }
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, "
+ "INDEX %lu)",
+ (void *)p, (void *)sd->subchannel, (void *)*target,
+ (void *)sd->subchannel_list, (unsigned long)next_ready_index);
+ }
+ /* only advance the last picked pointer if the selection was used */
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
+ return 1;
}
- pending_pick *pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->on_complete = on_complete;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->user_data = user_data;
- p->pending_picks = pp;
- return 0;
}
+ /* no pick currently available. Save for later in list of pending picks */
+ if (!p->started_picking) {
+ start_picking_locked(exec_ctx, p);
+ }
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->on_complete = on_complete;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
+ p->pending_picks = pp;
+ return 0;
}
static void update_state_counters_locked(subchannel_data *sd) {
- round_robin_lb_policy *p = sd->policy;
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
- GPR_ASSERT(p->num_ready > 0);
- --p->num_ready;
+ GPR_ASSERT(subchannel_list->num_ready > 0);
+ --subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- GPR_ASSERT(p->num_transient_failures > 0);
- --p->num_transient_failures;
+ GPR_ASSERT(subchannel_list->num_transient_failures > 0);
+ --subchannel_list->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
- GPR_ASSERT(p->num_idle > 0);
- --p->num_idle;
+ GPR_ASSERT(subchannel_list->num_idle > 0);
+ --subchannel_list->num_idle;
}
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
- ++p->num_ready;
+ ++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- ++p->num_transient_failures;
+ ++subchannel_list->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
- ++p->num_idle;
+ ++subchannel_list->num_idle;
}
}
-/* sd is the subchannel_data associted with the updated subchannel.
- * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
- * or SHUTDOWN */
+/** Sets the policy's connectivity status based on that of the passed-in \a sd
+ * (the subchannel_data associted with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
+ * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
+ * connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
@@ -401,17 +470,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
- * CHECK: p->num_subchannels = 0.
+ * CHECK: p->subchannel_list->num_subchannels = 0.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: p->num_transient_failures == p->num_subchannels.
+ * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: p->num_idle == p->num_subchannels.
+ * CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
- round_robin_lb_policy *p = sd->policy;
- if (p->num_ready > 0) { /* 1) READY */
+ rr_subchannel_list *subchannel_list = sd->subchannel_list;
+ round_robin_lb_policy *p = subchannel_list->policy;
+ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -421,18 +491,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
return GRPC_CHANNEL_CONNECTING;
- } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
return GRPC_CHANNEL_SHUTDOWN;
- } else if (p->num_transient_failures ==
- p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ } else if (subchannel_list->num_transient_failures ==
+ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
return GRPC_CHANNEL_TRANSIENT_FAILURE;
- } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ } else if (subchannel_list->num_idle ==
+ p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
return GRPC_CHANNEL_IDLE;
@@ -444,7 +515,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
- round_robin_lb_policy *p = sd->policy;
+ round_robin_lb_policy *p = sd->subchannel_list->policy;
+ // If the policy is shutting down, unref and return.
+ if (p->shutdown) {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
+ return;
+ }
+ if (sd->subchannel_list->shutting_down) {
+ // the subchannel list associated with sd has been discarded. This callback
+ // corresponds to the unsubscription.
+ GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
+ return;
+ }
+ // Dispose of outdated subchannel lists.
+ if (sd->subchannel_list != p->subchannel_list &&
+ sd->subchannel_list != p->latest_pending_subchannel_list) {
+ // sd belongs to an outdated subchannel_list: get rid of it.
+ rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated");
+ return;
+ }
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -453,21 +545,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p: "
"prev_state=%d new_state=%d",
- p, sd->subchannel, sd->prev_connectivity_state,
+ (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
sd->curr_connectivity_state);
}
- // If we're shutting down, unref and return.
- if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- return;
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
- grpc_connectivity_state new_connectivity_state =
+ const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
- // If the new state is SHUTDOWN, unref the subchannel, and if the new
- // overall state is SHUTDOWN, clean up.
+ // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
+ // policy's state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
sd->subchannel = NULL;
@@ -475,7 +562,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
- if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
pending_pick *pp;
while ((pp = p->pending_picks)) {
@@ -486,15 +573,42 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
/* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- } else {
+ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
+ "rr_connectivity_sd_shutdown");
+ } else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (sd->subchannel_list != p->subchannel_list) {
+ // promote sd->subchannel_list to p->subchannel_list.
+ // sd->subchannel_list must be equal to
+ // p->latest_pending_subchannel_list because we have already filtered
+ // for sds belonging to outdated subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(!sd->subchannel_list->shutting_down);
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] phasing out subchannel list %p (size %lu) in favor "
+ "of %p (size %lu)",
+ (void *)p, (void *)p->subchannel_list,
+ (unsigned long)p->subchannel_list->num_subchannels,
+ (void *)sd->subchannel_list,
+ (unsigned long)sd->subchannel_list->num_subchannels);
+ }
+ if (p->subchannel_list != NULL) {
+ // dispose of the current subchannel_list
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update_connectivity");
+ }
+ p->subchannel_list = sd->subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- GPR_ASSERT(next_ready_index < p->num_subchannels);
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
@@ -519,7 +633,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp);
}
}
- /* renew notification: reuses the "rr_connectivity" weak ref */
+ /* renew notification: reuses the "rr_connectivity" weak ref on the policy
+ * as well as the sd->subchannel_list ref. */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe,
@@ -546,8 +661,9 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->num_subchannels) {
- subchannel_data *selected = &p->subchannels[next_ready_index];
+ if (next_ready_index < p->subchannel_list->num_subchannels) {
+ subchannel_data *selected =
+ &p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
@@ -559,52 +675,68 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown_locked,
- rr_pick_locked,
- rr_cancel_pick_locked,
- rr_cancel_picks_locked,
- rr_ping_one_locked,
- rr_exit_idle_locked,
- rr_check_connectivity_locked,
- rr_notify_on_state_change_locked};
-
-static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
-
-static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-
-static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- GPR_ASSERT(args->client_channel_factory != NULL);
-
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
+static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_args *args) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
+ /* Find the number of backend addresses. We ignore balancer addresses, since
+ * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
+ if (p->subchannel_list == NULL) {
+ // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+ "rr_update_missing");
+ } else {
+ // otherwise, keep using the current subchannel list (ignore this update).
+ gpr_log(GPR_ERROR,
+ "No valid LB addresses channel arg for Round Robin %p update, "
+ "ignoring.",
+ (void *)p);
+ }
+ return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
- if (num_addrs == 0) return NULL;
-
- round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
-
- p->num_addresses = num_addrs;
- p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
-
- grpc_subchannel_args sc_args;
+ if (num_addrs == 0) {
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ "rr_update_empty");
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_update");
+ p->subchannel_list = NULL;
+ }
+ return;
+ }
size_t subchannel_index = 0;
+ rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
+ subchannel_list->policy = p;
+ subchannel_list->subchannels =
+ gpr_zalloc(sizeof(subchannel_data) * num_addrs);
+ subchannel_list->num_subchannels = num_addrs;
+ gpr_ref_init(&subchannel_list->refcount, 1);
+ p->latest_pending_subchannel_list = subchannel_list;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
+ (void *)subchannel_list, (unsigned long)num_addrs);
+ }
+ grpc_subchannel_args sc_args;
+ /* We need to remove the LB addresses in order to be able to compare the
+ * subchannel keys of subchannels from a different batch of addresses. */
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ /* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
-
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ GPR_ASSERT(i < num_addrs);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
@@ -618,52 +750,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
- (unsigned long)subchannel_index, (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG,
+ "index %lu: Created subchannel %p for address uri %s into "
+ "subchannel_list %p",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri,
+ (void *)subchannel_list);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
- if (subchannel != NULL) {
- subchannel_data *sd = &p->subchannels[subchannel_index];
- sd->policy = p;
- sd->subchannel = subchannel;
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != NULL) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
- }
- grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
- ++subchannel_index;
+ subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
+ sd->subchannel_list = subchannel_list;
+ sd->subchannel = subchannel;
+ grpc_closure_init(&sd->connectivity_changed_closure,
+ rr_connectivity_changed_locked, sd,
+ grpc_combiner_scheduler(args->combiner, false));
+ /* use some sentinel value outside of the range of
+ * grpc_connectivity_state to signal an undefined previous state. We
+ * won't be referring to this value again and it'll be overwritten after
+ * the first call to rr_connectivity_changed_locked */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->user_data_vtable = addresses->user_data_vtable;
+ if (sd->user_data_vtable != NULL) {
+ sd->user_data =
+ sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ }
+ if (p->started_picking) {
+ rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
+ /* 2. Watch every new subchannel. A subchannel list becomes active the
+ * moment one of its subchannels is READY. At that moment, we swap
+ * p->subchannel_list for sd->subchannel_list, provided the subchannel
+ * list is still valid (ie, isn't shutting down) */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
}
- if (subchannel_index == 0) {
- /* couldn't create any subchannel. Bail out */
- gpr_free(p->subchannels);
- gpr_free(p);
- return NULL;
+ if (!p->started_picking) {
+ // The policy isn't picking yet. Save the update for later, disposing of
+ // previous version if any.
+ if (p->subchannel_list != NULL) {
+ rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
+ "rr_update_before_started_picking");
+ }
+ p->subchannel_list = subchannel_list;
}
- p->num_subchannels = subchannel_index;
+}
+
+static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
+ rr_destroy,
+ rr_shutdown_locked,
+ rr_pick_locked,
+ rr_cancel_pick_locked,
+ rr_cancel_picks_locked,
+ rr_ping_one_locked,
+ rr_exit_idle_locked,
+ rr_check_connectivity_locked,
+ rr_notify_on_state_change_locked,
+ rr_update_locked};
+
+static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
- // Initialize the last pick index to the last subchannel, so that the
- // first pick will start at the beginning of the list.
- p->last_ready_subchannel_index = subchannel_index - 1;
+static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
+static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ GPR_ASSERT(args->client_channel_factory != NULL);
+ round_robin_lb_policy *p = gpr_zalloc(sizeof(*p));
+ rr_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
- (void *)p, (unsigned long)p->num_subchannels);
+ gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p,
+ (unsigned long)p->subchannel_list->num_subchannels);
}
return &p->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 9d6c0fc139..5c6464bc87 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -118,11 +118,11 @@ grpc_lb_addresses *grpc_lb_addresses_find_channel_arg(
const grpc_channel_args *channel_args);
/** Arguments passed to LB policies. */
-typedef struct grpc_lb_policy_args {
+struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory;
grpc_channel_args *args;
grpc_combiner *combiner;
-} grpc_lb_policy_args;
+};
struct grpc_lb_policy_factory_vtable {
void (*ref)(grpc_lb_policy_factory *factory);
diff --git a/test/core/end2end/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 736b224fd6..cb9b1f07c2 100644
--- a/test/core/end2end/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -54,7 +54,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-#include "test/core/end2end/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
//
// fake_resolver
@@ -98,7 +98,7 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
fake_resolver* r) {
if (r->next_completion != NULL && r->next_results != NULL) {
*r->target_result =
- grpc_channel_args_merge(r->channel_args, r->next_results);
+ grpc_channel_args_union(r->next_results, r->channel_args);
grpc_channel_args_destroy(exec_ctx, r->next_results);
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
@@ -243,11 +243,13 @@ static char* fake_resolver_get_default_authority(grpc_resolver_factory* factory,
static const grpc_resolver_factory_vtable fake_resolver_factory_vtable = {
fake_resolver_factory_ref, fake_resolver_factory_unref,
- fake_resolver_create, fake_resolver_get_default_authority, "test"};
+ fake_resolver_create, fake_resolver_get_default_authority, "fake"};
static grpc_resolver_factory fake_resolver_factory = {
&fake_resolver_factory_vtable};
-void grpc_fake_resolver_init(void) {
+void grpc_resolver_fake_init(void) {
grpc_register_resolver_type(&fake_resolver_factory);
}
+
+void grpc_resolver_fake_shutdown(void) {}
diff --git a/test/core/end2end/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index d9668d0d11..373509b97f 100644
--- a/test/core/end2end/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -29,19 +29,17 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-#ifndef GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
-#define GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
-#include "test/core/util/test_config.h"
-
#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
"grpc.fake_resolver.response_generator"
-void grpc_fake_resolver_init();
+void grpc_resolver_fake_init();
// Instances of \a grpc_fake_resolver_response_generator are passed to the
// fake resolver in a channel argument (see \a
@@ -73,4 +71,5 @@ grpc_fake_resolver_response_generator_ref(
void grpc_fake_resolver_response_generator_unref(
grpc_fake_resolver_response_generator* generator);
-#endif /* GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H */
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H \
+ */
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index dd14bf1d02..1007916b40 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -307,7 +307,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
const grpc_subchannel_args *args) {
- grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
+ grpc_subchannel_key *key = grpc_subchannel_key_create(args);
grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
if (c) {
grpc_subchannel_key_destroy(exec_ctx, key);
@@ -770,6 +770,11 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
+const grpc_subchannel_key *grpc_subchannel_get_key(
+ const grpc_subchannel *subchannel) {
+ return subchannel->key;
+}
+
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
const grpc_connected_subchannel_call_args *args,
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index e433c33e40..e284001fa2 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -50,6 +50,7 @@ typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
+typedef struct grpc_subchannel_key grpc_subchannel_key;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
@@ -155,6 +156,10 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel *subchannel);
+/** return the subchannel index key for \a subchannel */
+const grpc_subchannel_key *grpc_subchannel_get_key(
+ const grpc_subchannel *subchannel);
+
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index b25dbfcf51..2a707353c0 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -50,7 +50,6 @@ static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
struct grpc_subchannel_key {
- grpc_connector *connector;
grpc_subchannel_args args;
};
@@ -73,10 +72,9 @@ static grpc_exec_ctx *current_ctx() {
}
static grpc_subchannel_key *create_key(
- grpc_connector *connector, const grpc_subchannel_args *args,
+ const grpc_subchannel_args *args,
grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
grpc_subchannel_key *k = gpr_malloc(sizeof(*k));
- k->connector = grpc_connector_ref(connector);
k->args.filter_count = args->filter_count;
if (k->args.filter_count > 0) {
k->args.filters =
@@ -91,19 +89,17 @@ static grpc_subchannel_key *create_key(
}
grpc_subchannel_key *grpc_subchannel_key_create(
- grpc_connector *connector, const grpc_subchannel_args *args) {
- return create_key(connector, args, grpc_channel_args_normalize);
+ const grpc_subchannel_args *args) {
+ return create_key(args, grpc_channel_args_normalize);
}
static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) {
- return create_key(k->connector, &k->args, grpc_channel_args_copy);
+ return create_key(&k->args, grpc_channel_args_copy);
}
-static int subchannel_key_compare(grpc_subchannel_key *a,
- grpc_subchannel_key *b) {
- int c = GPR_ICMP(a->connector, b->connector);
- if (c != 0) return c;
- c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
+int grpc_subchannel_key_compare(const grpc_subchannel_key *a,
+ const grpc_subchannel_key *b) {
+ int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
@@ -115,7 +111,6 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
- grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free(k);
@@ -128,7 +123,7 @@ static void sck_avl_destroy(void *p) {
static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); }
static long sck_avl_compare(void *a, void *b) {
- return subchannel_key_compare(a, b);
+ return grpc_subchannel_key_compare(a, b);
}
static void scv_avl_destroy(void *p) {
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index f673ade378..55e90bb645 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -34,17 +34,14 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H
-#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
-typedef struct grpc_subchannel_key grpc_subchannel_key;
-
/** Create a key that can be used to uniquely identify a subchannel */
grpc_subchannel_key *grpc_subchannel_key_create(
- grpc_connector *con, const grpc_subchannel_args *args);
+ const grpc_subchannel_args *args);
/** Destroy a subchannel key */
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
@@ -69,6 +66,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
+int grpc_subchannel_key_compare(const grpc_subchannel_key *a,
+ const grpc_subchannel_key *b);
+
/** Initialize the subchannel index (global) */
void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h
index 7cd70c12d8..0608d1e937 100644
--- a/src/core/ext/filters/workarounds/workaround_utils.h
+++ b/src/core/ext/filters/workarounds/workaround_utils.h
@@ -49,4 +49,4 @@ typedef bool (*user_agent_parser)(grpc_mdelem);
void grpc_register_workaround(uint32_t id, user_agent_parser parser);
-#endif
+#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H */
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 247b134938..2df07c8ae6 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -129,9 +129,23 @@ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) {
return grpc_channel_args_copy_and_add(src, NULL, 0);
}
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a,
const grpc_channel_args *b) {
- return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
+ const size_t max_out = (a->num_args + b->num_args);
+ grpc_arg *uniques = gpr_malloc(sizeof(*uniques) * max_out);
+ for (size_t i = 0; i < a->num_args; ++i) uniques[i] = a->args[i];
+
+ size_t uniques_idx = a->num_args;
+ for (size_t i = 0; i < b->num_args; ++i) {
+ const char *b_key = b->args[i].key;
+ if (grpc_channel_args_find(a, b_key) == NULL) { // not found
+ uniques[uniques_idx++] = b->args[i];
+ }
+ }
+ grpc_channel_args *result =
+ grpc_channel_args_copy_and_add(NULL, uniques, uniques_idx);
+ gpr_free(uniques);
+ return result;
}
static int cmp_arg(const grpc_arg *a, const grpc_arg *b) {
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index f0f603e251..128afd00d4 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -63,8 +63,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
const grpc_arg *to_add, size_t num_to_add);
-/** Concatenate args from \a a and \a b into a new instance */
-grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
+/** Perform the union of \a a and \a b, prioritizing \a a entries */
+grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Destroy arguments created by \a grpc_channel_args_copy */
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
index e81531053c..740e553b2a 100644
--- a/src/core/lib/iomgr/polling_entity.h
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -38,9 +38,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
/* A grpc_polling_entity is a pollset-or-pollset_set container. It allows
- * functions that
- * accept a pollset XOR a pollset_set to do so through an abstract interface.
- * No ownership is taken. */
+ * functions that accept a pollset XOR a pollset_set to do so through an
+ * abstract interface. No ownership is taken. */
typedef struct grpc_polling_entity {
union {
@@ -64,18 +63,14 @@ grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent);
bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent);
/** Add the pollset or pollset_set in \a pollent to the destination pollset_set
- * \a
- * pss_dst */
+ * \a * pss_dst */
void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst);
/** Delete the pollset or pollset_set in \a pollent from the destination
- * pollset_set \a
- * pss_dst */
+ * pollset_set \a * pss_dst */
void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst);
-/* pollset_set specific */
-
#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
diff --git a/src/core/lib/security/transport/lb_targets_info.c b/src/core/lib/security/transport/lb_targets_info.c
index e73483c039..8bb06b1f61 100644
--- a/src/core/lib/security/transport/lb_targets_info.c
+++ b/src/core/lib/security/transport/lb_targets_info.c
@@ -44,7 +44,9 @@ static void *targets_info_copy(void *p) { return grpc_slice_hash_table_ref(p); }
static void targets_info_destroy(grpc_exec_ctx *exec_ctx, void *p) {
grpc_slice_hash_table_unref(exec_ctx, p);
}
-static int targets_info_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+static int targets_info_cmp(void *a, void *b) {
+ return grpc_slice_hash_table_cmp(a, b);
+}
static const grpc_arg_pointer_vtable server_to_balancer_names_vtable = {
targets_info_copy, targets_info_destroy, targets_info_cmp};
diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c
index 444f22aa19..2d9ff61c95 100644
--- a/src/core/lib/slice/slice_hash_table.c
+++ b/src/core/lib/slice/slice_hash_table.c
@@ -43,6 +43,7 @@
struct grpc_slice_hash_table {
gpr_refcount refs;
void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value);
+ int (*value_cmp)(void* a, void* b);
size_t size;
size_t max_num_probes;
grpc_slice_hash_table_entry* entries;
@@ -72,10 +73,12 @@ static void grpc_slice_hash_table_add(grpc_slice_hash_table* table,
grpc_slice_hash_table* grpc_slice_hash_table_create(
size_t num_entries, grpc_slice_hash_table_entry* entries,
- void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value),
+ int (*value_cmp)(void* a, void* b)) {
grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table));
gpr_ref_init(&table->refs, 1);
table->destroy_value = destroy_value;
+ table->value_cmp = value_cmp;
// Keep load factor low to improve performance of lookups.
table->size = num_entries * 2;
const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size;
@@ -121,3 +124,37 @@ void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table,
}
return NULL; // Not found.
}
+
+static int pointer_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
+int grpc_slice_hash_table_cmp(const grpc_slice_hash_table* a,
+ const grpc_slice_hash_table* b) {
+ int (*const value_cmp_fn_a)(void* a, void* b) =
+ a->value_cmp != NULL ? a->value_cmp : pointer_cmp;
+ int (*const value_cmp_fn_b)(void* a, void* b) =
+ b->value_cmp != NULL ? b->value_cmp : pointer_cmp;
+ // Compare value_fns
+ const int value_fns_cmp =
+ GPR_ICMP((void*)value_cmp_fn_a, (void*)value_cmp_fn_b);
+ if (value_fns_cmp != 0) return value_fns_cmp;
+ // Compare sizes
+ if (a->size < b->size) return -1;
+ if (a->size > b->size) return 1;
+ // Compare rows.
+ for (size_t i = 0; i < a->size; ++i) {
+ if (is_empty(&a->entries[i])) {
+ if (!is_empty(&b->entries[i])) {
+ return -1; // a empty but b non-empty
+ }
+ continue; // both empty, no need to check key or value
+ } else if (is_empty(&b->entries[i])) {
+ return 1; // a non-empty but b empty
+ }
+ // neither entry is empty
+ const int key_cmp = grpc_slice_cmp(a->entries[i].key, b->entries[i].key);
+ if (key_cmp != 0) return key_cmp;
+ const int value_cmp =
+ value_cmp_fn_a(a->entries[i].value, b->entries[i].value);
+ if (value_cmp != 0) return value_cmp;
+ }
+ return 0;
+}
diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h
index 1e61c5eb11..07988abff4 100644
--- a/src/core/lib/slice/slice_hash_table.h
+++ b/src/core/lib/slice/slice_hash_table.h
@@ -54,11 +54,15 @@ typedef struct grpc_slice_hash_table_entry {
} grpc_slice_hash_table_entry;
/** Creates a new hash table of containing \a entries, which is an array
- of length \a num_entries. Takes ownership of all keys and values in
- \a entries. Values will be cleaned up via \a destroy_value(). */
+ of length \a num_entries. Takes ownership of all keys and values in \a
+ entries. Values will be cleaned up via \a destroy_value(). If not NULL, \a
+ value_cmp will be used to compare values in the context of \a
+ grpc_slice_hash_table_cmp. If NULL, raw pointer (\a GPR_ICMP) comparison
+ will be used. */
grpc_slice_hash_table *grpc_slice_hash_table_create(
size_t num_entries, grpc_slice_hash_table_entry *entries,
- void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value));
+ void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value),
+ int (*value_cmp)(void *a, void *b));
grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table);
void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
@@ -69,4 +73,13 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
void *grpc_slice_hash_table_get(const grpc_slice_hash_table *table,
const grpc_slice key);
+/** Compares \a a vs. \a b.
+ * A table is considered "smaller" (resp. "greater") if:
+ * - GPR_ICMP(a->value_cmp, b->value_cmp) < 1 (resp. > 1),
+ * - else, it contains fewer (resp. more) entries,
+ * - else, if strcmp(a_key, b_key) < 1 (resp. > 1),
+ * - else, if value_cmp(a_value, b_value) < 1 (resp. > 1). */
+int grpc_slice_hash_table_cmp(const grpc_slice_hash_table *a,
+ const grpc_slice_hash_table *b);
+
#endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 6aecb7fa93..107818f4d1 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -229,7 +229,7 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_slice_hash_table* method_config_table = NULL;
if (entries != NULL) {
method_config_table =
- grpc_slice_hash_table_create(num_entries, entries, destroy_value);
+ grpc_slice_hash_table_create(num_entries, entries, destroy_value, NULL);
gpr_free(entries);
}
return method_config_table;
diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c
index 510cf5d5a0..64e3c07510 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_plugin_registry.c
@@ -41,6 +41,8 @@ extern void grpc_deadline_filter_init(void);
extern void grpc_deadline_filter_shutdown(void);
extern void grpc_client_channel_init(void);
extern void grpc_client_channel_shutdown(void);
+extern void grpc_resolver_fake_init(void);
+extern void grpc_resolver_fake_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
extern void grpc_lb_policy_grpclb_shutdown(void);
extern void grpc_lb_policy_pick_first_init(void);
@@ -73,6 +75,8 @@ void grpc_register_built_in_plugins(void) {
grpc_deadline_filter_shutdown);
grpc_register_plugin(grpc_client_channel_init,
grpc_client_channel_shutdown);
+ grpc_register_plugin(grpc_resolver_fake_init,
+ grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
index e5eb68f934..76b17ed06d 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c
@@ -47,6 +47,8 @@ extern void grpc_resolver_dns_native_init(void);
extern void grpc_resolver_dns_native_shutdown(void);
extern void grpc_resolver_sockaddr_init(void);
extern void grpc_resolver_sockaddr_shutdown(void);
+extern void grpc_resolver_fake_init(void);
+extern void grpc_resolver_fake_shutdown(void);
extern void grpc_load_reporting_plugin_init(void);
extern void grpc_load_reporting_plugin_shutdown(void);
extern void grpc_lb_policy_grpclb_init(void);
@@ -79,6 +81,8 @@ void grpc_register_built_in_plugins(void) {
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
+ grpc_register_plugin(grpc_resolver_fake_init,
+ grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_load_reporting_plugin_init,
grpc_load_reporting_plugin_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 5f8075467d..8a1a58bb86 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -296,6 +296,7 @@ CORE_SOURCE_FILES = [
'third_party/nanopb/pb_common.c',
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
+ 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c',
diff --git a/test/core/client_channel/resolvers/BUILD b/test/core/client_channel/resolvers/BUILD
index 80ca7d3ebb..030f6091b5 100644
--- a/test/core/client_channel/resolvers/BUILD
+++ b/test/core/client_channel/resolvers/BUILD
@@ -74,7 +74,7 @@ grpc_cc_test(
deps = [
"//:gpr",
"//:grpc",
- "//test/core/end2end:fake_resolver",
+ "//:grpc_resolver_fake",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
diff --git a/test/core/client_channel/resolvers/fake_resolver_test.c b/test/core/client_channel/resolvers/fake_resolver_test.c
index 861918fbd6..c211f26b76 100644
--- a/test/core/client_channel/resolvers/fake_resolver_test.c
+++ b/test/core/client_channel/resolvers/fake_resolver_test.c
@@ -39,18 +39,18 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
-#include "test/core/end2end/fake_resolver.h"
#include "test/core/util/test_config.h"
static grpc_resolver *build_fake_resolver(
grpc_exec_ctx *exec_ctx, grpc_combiner *combiner,
grpc_fake_resolver_response_generator *response_generator) {
- grpc_resolver_factory *factory = grpc_resolver_factory_lookup("test");
+ grpc_resolver_factory *factory = grpc_resolver_factory_lookup("fake");
grpc_arg generator_arg =
grpc_fake_resolver_response_generator_arg(response_generator);
grpc_resolver_args args;
@@ -177,7 +177,6 @@ static void test_fake_resolver() {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- grpc_fake_resolver_init(); // Registers the "test" scheme.
grpc_init();
test_fake_resolver();
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index cf387a93e8..7705f62a79 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -59,18 +59,6 @@ grpc_cc_library(
visibility = ["//test:__subpackages__"],
)
-grpc_cc_library(
- name = "fake_resolver",
- srcs = ["fake_resolver.c"],
- hdrs = ["fake_resolver.h"],
- language = "C",
- visibility = ["//test:__subpackages__"],
- deps = [
- "//:gpr",
- "//:grpc",
- "//test/core/util:grpc_test_util",
- ],
-)
grpc_cc_library(
name = "http_proxy",
diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl
index 6865aefa3d..0adf7eb989 100755
--- a/test/core/end2end/generate_tests.bzl
+++ b/test/core/end2end/generate_tests.bzl
@@ -173,7 +173,6 @@ def grpc_end2end_tests():
deps = [
':cq_verifier',
':ssl_test_data',
- ':fake_resolver',
':http_proxy',
':proxy',
]
diff --git a/test/core/slice/slice_hash_table_test.c b/test/core/slice/slice_hash_table_test.c
index 67041b2d5c..16bfb424c3 100644
--- a/test/core/slice/slice_hash_table_test.c
+++ b/test/core/slice/slice_hash_table_test.c
@@ -77,6 +77,19 @@ static void destroy_string(grpc_exec_ctx* exec_ctx, void* value) {
gpr_free(value);
}
+static grpc_slice_hash_table* create_table_from_entries(
+ const test_entry* test_entries, size_t num_test_entries,
+ int (*value_cmp_fn)(void*, void*)) {
+ // Construct table.
+ grpc_slice_hash_table_entry* entries =
+ gpr_zalloc(sizeof(*entries) * num_test_entries);
+ populate_entries(test_entries, num_test_entries, entries);
+ grpc_slice_hash_table* table = grpc_slice_hash_table_create(
+ num_test_entries, entries, destroy_string, value_cmp_fn);
+ gpr_free(entries);
+ return table;
+}
+
static void test_slice_hash_table() {
const test_entry test_entries[] = {
{"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"},
@@ -115,13 +128,8 @@ static void test_slice_hash_table() {
{"key_99", "value_99"},
};
const size_t num_entries = GPR_ARRAY_SIZE(test_entries);
- // Construct table.
- grpc_slice_hash_table_entry* entries =
- gpr_zalloc(sizeof(*entries) * num_entries);
- populate_entries(test_entries, num_entries, entries);
grpc_slice_hash_table* table =
- grpc_slice_hash_table_create(num_entries, entries, destroy_string);
- gpr_free(entries);
+ create_table_from_entries(test_entries, num_entries, NULL);
// Check contents of table.
check_values(test_entries, num_entries, table);
check_non_existent_value("XX", table);
@@ -131,8 +139,118 @@ static void test_slice_hash_table() {
grpc_exec_ctx_finish(&exec_ctx);
}
+static int value_cmp_fn(void* a, void* b) {
+ const char* a_str = a;
+ const char* b_str = b;
+ return strcmp(a_str, b_str);
+}
+
+static int pointer_cmp_fn(void* a, void* b) { return GPR_ICMP(a, b); }
+
+static void test_slice_hash_table_eq() {
+ const test_entry test_entries_a[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a);
+ grpc_slice_hash_table* table_a =
+ create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_a) == 0);
+
+ const test_entry test_entries_b[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_b = GPR_ARRAY_SIZE(test_entries_b);
+ grpc_slice_hash_table* table_b =
+ create_table_from_entries(test_entries_b, num_entries_b, value_cmp_fn);
+
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b) == 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_hash_table_unref(&exec_ctx, table_a);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_hash_table_not_eq() {
+ const test_entry test_entries_a[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a);
+ grpc_slice_hash_table* table_a =
+ create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn);
+
+ // Different sizes.
+ const test_entry test_entries_b_smaller[] = {{"key_0", "value_0"},
+ {"key_1", "value_1"}};
+ const size_t num_entries_b_smaller = GPR_ARRAY_SIZE(test_entries_b_smaller);
+ grpc_slice_hash_table* table_b_smaller = create_table_from_entries(
+ test_entries_b_smaller, num_entries_b_smaller, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_smaller) > 0);
+
+ const test_entry test_entries_b_larger[] = {{"key_0", "value_0"},
+ {"key_1", "value_1"},
+ {"key_2", "value_2"},
+ {"key_3", "value_3"}};
+ const size_t num_entries_b_larger = GPR_ARRAY_SIZE(test_entries_b_larger);
+ grpc_slice_hash_table* table_b_larger = create_table_from_entries(
+ test_entries_b_larger, num_entries_b_larger, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_larger) < 0);
+
+ // One key doesn't match and is lexicographically "smaller".
+ const test_entry test_entries_c[] = {
+ {"key_zz", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_c = GPR_ARRAY_SIZE(test_entries_c);
+ grpc_slice_hash_table* table_c =
+ create_table_from_entries(test_entries_c, num_entries_c, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_c) > 0);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_c, table_a) < 0);
+
+ // One value doesn't match.
+ const test_entry test_entries_d[] = {
+ {"key_0", "value_z"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_d = GPR_ARRAY_SIZE(test_entries_d);
+ grpc_slice_hash_table* table_d =
+ create_table_from_entries(test_entries_d, num_entries_d, value_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_d) < 0);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_d, table_a) > 0);
+
+ // Same values but different "equals" functions.
+ const test_entry test_entries_e[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_e = GPR_ARRAY_SIZE(test_entries_e);
+ grpc_slice_hash_table* table_e =
+ create_table_from_entries(test_entries_e, num_entries_e, value_cmp_fn);
+ const test_entry test_entries_f[] = {
+ {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}};
+ const size_t num_entries_f = GPR_ARRAY_SIZE(test_entries_f);
+ grpc_slice_hash_table* table_f =
+ create_table_from_entries(test_entries_f, num_entries_f, pointer_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_e, table_f) != 0);
+
+ // Same (empty) key, different values.
+ const test_entry test_entries_g[] = {{"", "value_0"}};
+ const size_t num_entries_g = GPR_ARRAY_SIZE(test_entries_g);
+ grpc_slice_hash_table* table_g =
+ create_table_from_entries(test_entries_g, num_entries_g, value_cmp_fn);
+ const test_entry test_entries_h[] = {{"", "value_1"}};
+ const size_t num_entries_h = GPR_ARRAY_SIZE(test_entries_h);
+ grpc_slice_hash_table* table_h =
+ create_table_from_entries(test_entries_h, num_entries_h, pointer_cmp_fn);
+ GPR_ASSERT(grpc_slice_hash_table_cmp(table_g, table_h) != 0);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_hash_table_unref(&exec_ctx, table_a);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b_larger);
+ grpc_slice_hash_table_unref(&exec_ctx, table_b_smaller);
+ grpc_slice_hash_table_unref(&exec_ctx, table_c);
+ grpc_slice_hash_table_unref(&exec_ctx, table_d);
+ grpc_slice_hash_table_unref(&exec_ctx, table_e);
+ grpc_slice_hash_table_unref(&exec_ctx, table_f);
+ grpc_slice_hash_table_unref(&exec_ctx, table_g);
+ grpc_slice_hash_table_unref(&exec_ctx, table_h);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_slice_hash_table();
+ test_slice_hash_table_eq();
+ test_slice_hash_table_not_eq();
return 0;
}
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 9b691a83e0..97558d70bb 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -212,8 +212,8 @@ grpc_cc_test(
)
grpc_cc_test(
- name = "round_robin_end2end_test",
- srcs = ["round_robin_end2end_test.cc"],
+ name = "client_lb_end2end_test",
+ srcs = ["client_lb_end2end_test.cc"],
deps = [
":test_service_impl",
"//:gpr",
@@ -243,7 +243,7 @@ grpc_cc_test(
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
- "//test/core/end2end:fake_resolver",
+ "//:grpc_resolver_fake",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
new file mode 100644
index 0000000000..ff00225597
--- /dev/null
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -0,0 +1,485 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+extern "C" {
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+}
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+
+using grpc::testing::EchoRequest;
+using grpc::testing::EchoResponse;
+using std::chrono::system_clock;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+// Subclass of TestServiceImpl that increments a request counter for
+// every call to the Echo RPC.
+class MyTestServiceImpl : public TestServiceImpl {
+ public:
+ MyTestServiceImpl() : request_count_(0) {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ ++request_count_;
+ }
+ return TestServiceImpl::Echo(context, request, response);
+ }
+
+ int request_count() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return request_count_;
+ }
+
+ void ResetCounters() {
+ std::unique_lock<std::mutex> lock(mu_);
+ request_count_ = 0;
+ }
+
+ private:
+ std::mutex mu_;
+ int request_count_;
+};
+
+class ClientLbEnd2endTest : public ::testing::Test {
+ protected:
+ ClientLbEnd2endTest() : server_host_("localhost") {}
+
+ void SetUp() override {
+ response_generator_ = grpc_fake_resolver_response_generator_create();
+ }
+
+ void TearDown() override {
+ grpc_fake_resolver_response_generator_unref(response_generator_);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ servers_[i]->Shutdown();
+ }
+ }
+
+ void StartServers(int num_servers) {
+ for (int i = 0; i < num_servers; ++i) {
+ servers_.emplace_back(new ServerData(server_host_));
+ }
+ }
+
+ void SetNextResolution(const std::vector<int>& ports) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), NULL);
+ for (size_t i = 0; i < ports.size(); ++i) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]);
+ grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true);
+ GPR_ASSERT(lb_uri != NULL);
+ grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri,
+ false /* is balancer */,
+ "" /* balancer name */, NULL);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+ }
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args* fake_result =
+ grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1);
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator_, fake_result);
+ grpc_channel_args_destroy(&exec_ctx, fake_result);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ void ResetStub(const grpc::string& lb_policy_name = "") {
+ ChannelArguments args;
+ if (lb_policy_name.size() > 0) {
+ args.SetLoadBalancingPolicyName(lb_policy_name);
+ } // else, default to pick first
+ args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ response_generator_);
+ std::ostringstream uri;
+ uri << "fake:///";
+ for (size_t i = 0; i < servers_.size() - 1; ++i) {
+ uri << "127.0.0.1:" << servers_[i]->port_ << ",";
+ }
+ uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_;
+ channel_ =
+ CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ }
+
+ void SendRpc() {
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Live long and prosper.");
+ ClientContext context;
+ Status status = stub_->Echo(&context, request, &response);
+ EXPECT_TRUE(status.ok());
+ EXPECT_EQ(response.message(), request.message());
+ }
+
+ struct ServerData {
+ int port_;
+ std::unique_ptr<Server> server_;
+ MyTestServiceImpl service_;
+ std::unique_ptr<std::thread> thread_;
+
+ explicit ServerData(const grpc::string& server_host) {
+ port_ = grpc_pick_unused_port_or_die();
+ gpr_log(GPR_INFO, "starting server on port %d", port_);
+ std::mutex mu;
+ std::condition_variable cond;
+ thread_.reset(new std::thread(
+ std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
+ std::unique_lock<std::mutex> lock(mu);
+ cond.wait(lock);
+ gpr_log(GPR_INFO, "server startup complete");
+ }
+
+ void Start(const grpc::string& server_host, std::mutex* mu,
+ std::condition_variable* cond) {
+ std::ostringstream server_address;
+ server_address << server_host << ":" << port_;
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address.str(),
+ InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ std::lock_guard<std::mutex> lock(*mu);
+ cond->notify_one();
+ }
+
+ void Shutdown() {
+ server_->Shutdown();
+ thread_->join();
+ }
+ };
+
+ void ResetCounters() {
+ for (const auto& server : servers_) server->service_.ResetCounters();
+ }
+
+ void WaitForServer(size_t server_idx) {
+ do {
+ SendRpc();
+ } while (servers_[server_idx]->service_.request_count() == 0);
+ ResetCounters();
+ }
+
+ const grpc::string server_host_;
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::vector<std::unique_ptr<ServerData>> servers_;
+ grpc_fake_resolver_response_generator* response_generator_;
+};
+
+TEST_F(ClientLbEnd2endTest, PickFirst) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ SetNextResolution(ports);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ SendRpc();
+ }
+ // All requests should have gone to a single server.
+ bool found = false;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ const int request_count = servers_[i]->service_.request_count();
+ if (request_count == kNumServers) {
+ found = true;
+ } else {
+ EXPECT_EQ(0, request_count);
+ }
+ }
+ EXPECT_TRUE(found);
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+
+ // Perform one RPC against the first server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [0] *******");
+ SendRpc();
+ EXPECT_EQ(servers_[0]->service_.request_count(), 1);
+
+ // An empty update will result in the channel going into TRANSIENT_FAILURE.
+ ports.clear();
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET none *******");
+ grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ do {
+ channel_state = channel_->GetState(true /* try to connect */);
+ } while (channel_state == GRPC_CHANNEL_READY);
+ GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
+ servers_[0]->service_.ResetCounters();
+
+ // Next update introduces servers_[1], making the channel recover.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [1] *******");
+ WaitForServer(1);
+ EXPECT_EQ(servers_[0]->service_.request_count(), 0);
+
+ // And again for servers_[2]
+ ports.clear();
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [2] *******");
+ WaitForServer(2);
+ EXPECT_EQ(servers_[0]->service_.request_count(), 0);
+ EXPECT_EQ(servers_[1]->service_.request_count(), 0);
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+
+ // Perform one RPC against the first server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET [0] *******");
+ SendRpc();
+ EXPECT_EQ(servers_[0]->service_.request_count(), 1);
+ servers_[0]->service_.ResetCounters();
+
+ // Send and superset update
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** SET superset *******");
+ SendRpc();
+ // We stick to the previously connected server.
+ WaitForServer(0);
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub(); // implicit pick first
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ for (size_t i = 0; i < 1000; ++i) {
+ std::random_shuffle(ports.begin(), ports.end());
+ SetNextResolution(ports);
+ if (i % 10 == 0) SendRpc();
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobin) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+ for (const auto& server : servers_) {
+ ports.emplace_back(server->port_);
+ }
+ SetNextResolution(ports);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ SendRpc();
+ }
+ // One request should have gone to each server.
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ EXPECT_EQ(1, servers_[i]->service_.request_count());
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+
+ // Start with a single server.
+ ports.emplace_back(servers_[0]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ // Send RPCs. They should all go servers_[0]
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(10, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[0]->service_.ResetCounters();
+
+ // And now for the second server.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request.
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ WaitForServer(1);
+
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(0, servers_[0]->service_.request_count());
+ EXPECT_EQ(10, servers_[1]->service_.request_count());
+ EXPECT_EQ(0, servers_[2]->service_.request_count());
+ servers_[1]->service_.ResetCounters();
+
+ // ... and for the last server.
+ ports.clear();
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(2);
+
+ for (size_t i = 0; i < 10; ++i) SendRpc();
+ EXPECT_EQ(0, servers_[0]->service_.request_count());
+ EXPECT_EQ(0, servers_[1]->service_.request_count());
+ EXPECT_EQ(10, servers_[2]->service_.request_count());
+ servers_[2]->service_.ResetCounters();
+
+ // Back to all servers.
+ ports.clear();
+ ports.emplace_back(servers_[0]->port_);
+ ports.emplace_back(servers_[1]->port_);
+ ports.emplace_back(servers_[2]->port_);
+ SetNextResolution(ports);
+ WaitForServer(0);
+ WaitForServer(1);
+ WaitForServer(2);
+
+ // Send three RPCs, one per server.
+ for (size_t i = 0; i < 3; ++i) SendRpc();
+ EXPECT_EQ(1, servers_[0]->service_.request_count());
+ EXPECT_EQ(1, servers_[1]->service_.request_count());
+ EXPECT_EQ(1, servers_[2]->service_.request_count());
+
+ // An empty update will result in the channel going into TRANSIENT_FAILURE.
+ ports.clear();
+ SetNextResolution(ports);
+ grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ do {
+ channel_state = channel_->GetState(true /* try to connect */);
+ } while (channel_state == GRPC_CHANNEL_READY);
+ GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
+ servers_[0]->service_.ResetCounters();
+
+ // Next update introduces servers_[1], making the channel recover.
+ ports.clear();
+ ports.emplace_back(servers_[1]->port_);
+ SetNextResolution(ports);
+ WaitForServer(1);
+ channel_state = channel_->GetState(false /* try to connect */);
+ GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
+ // Start servers and send one RPC per server.
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ ResetStub("round_robin");
+ std::vector<int> ports;
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ ports.emplace_back(servers_[i]->port_);
+ }
+ for (size_t i = 0; i < 1000; ++i) {
+ std::random_shuffle(ports.begin(), ports.end());
+ SetNextResolution(ports);
+ if (i % 10 == 0) SendRpc();
+ }
+ // Check LB policy name for the channel.
+ EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_test_init(argc, argv);
+ grpc_init();
+ const auto result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index b0d4e2dadf..6ff3642adf 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -50,8 +50,8 @@
#include <gtest/gtest.h>
extern "C" {
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/iomgr/sockaddr.h"
-#include "test/core/end2end/fake_resolver.h"
}
#include "test/core/util/port.h"
@@ -117,6 +117,12 @@ class CountedService : public ServiceType {
++request_count_;
}
+ void ResetCounters() {
+ std::unique_lock<std::mutex> lock(mu_);
+ request_count_ = 0;
+ response_count_ = 0;
+ }
+
protected:
std::mutex mu_;
@@ -181,6 +187,7 @@ class BalancerServiceImpl : public BalancerService {
shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
+ gpr_log(GPR_INFO, "LB: BalanceLoad");
LoadBalanceRequest request;
stream->Read(&request);
IncreaseRequestCount();
@@ -200,9 +207,16 @@ class BalancerServiceImpl : public BalancerService {
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
- if (shutdown_) break;
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (shutdown_) break;
+ }
SendResponse(stream, response_and_delay.first, response_and_delay.second);
}
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ serverlist_cond_.wait(lock);
+ }
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
@@ -226,9 +240,10 @@ class BalancerServiceImpl : public BalancerService {
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
std::lock_guard<std::mutex> lock(mu_);
- cond_.notify_one();
+ load_report_cond_.notify_one();
}
+ gpr_log(GPR_INFO, "LB: done");
return Status::OK;
}
@@ -237,9 +252,14 @@ class BalancerServiceImpl : public BalancerService {
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
- void Shutdown() {
+ // Returns true on its first invocation, false otherwise.
+ bool Shutdown() {
+ NotifyDoneWithServerlists();
std::unique_lock<std::mutex> lock(mu_);
+ const bool prev = !shutdown_;
shutdown_ = true;
+ gpr_log(GPR_INFO, "LB: shut down");
+ return prev;
}
static LoadBalanceResponse BuildResponseForBackends(
@@ -264,26 +284,35 @@ class BalancerServiceImpl : public BalancerService {
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
- cond_.wait(lock);
+ load_report_cond_.wait(lock);
return client_stats_;
}
+ void NotifyDoneWithServerlists() {
+ std::lock_guard<std::mutex> lock(mu_);
+ serverlist_cond_.notify_one();
+ }
+
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+ if (delay_ms > 0) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+ }
gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
response.DebugString().c_str());
- stream->Write(response);
IncreaseResponseCount();
+ stream->Write(response);
}
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
- std::condition_variable cond_;
+ std::condition_variable load_report_cond_;
+ std::condition_variable serverlist_cond_;
ClientStats client_stats_;
bool shutdown_;
};
@@ -326,8 +355,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
backend_servers_[i].Shutdown();
}
for (size_t i = 0; i < balancers_.size(); ++i) {
- balancers_[i]->Shutdown();
- balancer_servers_[i].Shutdown();
+ if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown();
}
grpc_fake_resolver_response_generator_unref(response_generator_);
}
@@ -336,8 +364,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
ChannelArguments args;
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_);
- channel_ = CreateCustomChannel("test:///not_used",
- InsecureChannelCredentials(), args);
+ std::ostringstream uri;
+ uri << "fake:///servername_not_used";
+ channel_ =
+ CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
@@ -497,6 +527,7 @@ TEST_F(SingleBalancerTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -541,7 +572,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
-
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent two responses.
@@ -608,13 +639,272 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kMessage_);
}
-
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// Check LB policy name for the channel.
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
+class UpdatesTest : public GrpclbEnd2endTest {
+ public:
+ UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {}
+};
+
+TEST_F(UpdatesTest, UpdateBalancers) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[1]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ do {
+ auto statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (backend_servers_[1].service_->request_count() == 0);
+
+ backend_servers_[1].service_->ResetCounters();
+ gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the second backend.
+ EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+// Send an update with the same set of LBs as the one in SetUp() in order to
+// verify that the LB channel inside grpclb keeps the initial connection (which
+// by definition is also present in the update).
+TEST_F(UpdatesTest, UpdateBalancersRepeated) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[0]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ gpr_timespec deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN));
+ // Send 10 seconds worth of RPCs
+ do {
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
+ // grpclb continued using the original LB call to the first balancer, which
+ // doesn't assign the second backend.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ balancers_[0]->NotifyDoneWithServerlists();
+
+ addresses.clear();
+ addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 2 DONE ==========");
+
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(10000, GPR_TIMESPAN));
+ // Send 10 seconds worth of RPCs
+ do {
+ statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
+ // grpclb continued using the original LB call to the first balancer, which
+ // doesn't assign the second backend.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ balancers_[0]->NotifyDoneWithServerlists();
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
+ const std::vector<int> first_backend{GetBackendPorts()[0]};
+ const std::vector<int> second_backend{GetBackendPorts()[1]};
+
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ ScheduleResponseForBalancer(
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
+ 0);
+
+ // Start servers and send 10 RPCs per server.
+ gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+ auto statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the first backend.
+ EXPECT_EQ(10U, backend_servers_[0].service_->request_count());
+
+ // Kill balancer 0
+ gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************");
+ if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown();
+ gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************");
+
+ // This is serviced by the existing RR policy
+ gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should again have gone to the first backend.
+ EXPECT_EQ(20U, backend_servers_[0].service_->request_count());
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ // Balancer 0 got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+
+ std::vector<AddressData> addresses;
+ addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
+ gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
+ SetNextResolution(addresses);
+ gpr_log(GPR_INFO, "========= UPDATE 1 DONE ==========");
+
+ // Wait until update has been processed, as signaled by the second backend
+ // receiving a request. In the meantime, the client continues to be serviced
+ // (by the first backend) without interruption.
+ EXPECT_EQ(0U, backend_servers_[1].service_->request_count());
+ do {
+ auto statuses_and_responses = SendRpc(kMessage_, 1);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ } while (backend_servers_[1].service_->request_count() == 0);
+
+ // This is serviced by the existing RR policy
+ backend_servers_[1].service_->ResetCounters();
+ gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
+ statuses_and_responses = SendRpc(kMessage_, 10);
+ gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+ // All 10 requests should have gone to the second backend.
+ EXPECT_EQ(10U, backend_servers_[1].service_->request_count());
+
+ balancers_[0]->NotifyDoneWithServerlists();
+ balancers_[1]->NotifyDoneWithServerlists();
+ balancers_[2]->NotifyDoneWithServerlists();
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
+ EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
+ EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
@@ -677,6 +967,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -722,6 +1013,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
EXPECT_EQ(kNumRpcsPerAddress,
backend_servers_[i].service_->request_count());
}
+ balancers_[0]->NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
// and sent a single response.
@@ -748,7 +1040,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
int main(int argc, char** argv) {
grpc_init();
grpc_test_init(argc, argv);
- grpc_fake_resolver_init();
::testing::InitGoogleTest(&argc, argv);
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index a002c7f77d..ee5dfa7133 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -52,6 +52,7 @@
#include <grpc++/impl/codegen/config.h>
extern "C" {
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -61,7 +62,6 @@ extern "C" {
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "test/core/end2end/cq_verifier.h"
-#include "test/core/end2end/fake_resolver.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
}
@@ -591,7 +591,7 @@ static void setup_client(const server_fixture *lb_server,
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
- gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport);
+ gpr_asprintf(&cf->server_uri, "fake:///%s", lb_server->servers_hostport);
const grpc_arg fake_addresses =
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args *fake_result =
@@ -804,7 +804,6 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
- grpc_fake_resolver_init();
grpc_test_init(argc, argv);
grpc_init();
const auto result = RUN_ALL_TESTS();
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 15410dec01..98ed72a6ad 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -950,6 +950,8 @@ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c \
src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h \
src/core/ext/filters/client_channel/resolver/dns/native/README.md \
src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \
+src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \
+src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \
src/core/ext/filters/client_channel/resolver/sockaddr/README.md \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
src/core/ext/filters/client_channel/resolver_factory.c \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 6c4462e9f2..8835118e3a 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2913,6 +2913,25 @@
{
"deps": [
"gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
+ "grpc++_test_util",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "client_lb_end2end_test",
+ "src": [
+ "test/cpp/end2end/client_lb_end2end_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
"grpc",
"grpc++",
"grpc++_codegen_base"
@@ -3782,25 +3801,6 @@
"headers": [],
"is_filegroup": false,
"language": "c++",
- "name": "round_robin_end2end_test",
- "src": [
- "test/cpp/end2end/round_robin_end2end_test.cc"
- ],
- "third_party": false,
- "type": "target"
- },
- {
- "deps": [
- "gpr",
- "gpr_test_util",
- "grpc",
- "grpc++",
- "grpc++_test_util",
- "grpc_test_util"
- ],
- "headers": [],
- "is_filegroup": false,
- "language": "c++",
"name": "secure_auth_context_test",
"src": [
"test/cpp/common/secure_auth_context_test.cc"
@@ -5832,6 +5832,7 @@
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
"grpc_resolver_dns_native",
+ "grpc_resolver_fake",
"grpc_resolver_sockaddr",
"grpc_secure",
"grpc_server_backward_compatibility",
@@ -5939,6 +5940,7 @@
"grpc_message_size_filter",
"grpc_resolver_dns_ares",
"grpc_resolver_dns_native",
+ "grpc_resolver_fake",
"grpc_resolver_sockaddr",
"grpc_server_backward_compatibility",
"grpc_transport_chttp2_client_insecure",
@@ -8349,6 +8351,7 @@
"gpr",
"grpc_base",
"grpc_client_channel",
+ "grpc_resolver_fake",
"nanopb"
],
"headers": [
@@ -8384,6 +8387,7 @@
"gpr",
"grpc_base",
"grpc_client_channel",
+ "grpc_resolver_fake",
"grpc_secure",
"nanopb"
],
@@ -8549,6 +8553,25 @@
"grpc_base",
"grpc_client_channel"
],
+ "headers": [
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+ ],
+ "is_filegroup": true,
+ "language": "c",
+ "name": "grpc_resolver_fake",
+ "src": [
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c",
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+ ],
+ "third_party": false,
+ "type": "filegroup"
+ },
+ {
+ "deps": [
+ "gpr",
+ "grpc_base",
+ "grpc_client_channel"
+ ],
"headers": [],
"is_filegroup": true,
"language": "c",
@@ -8664,8 +8687,8 @@
"grpc"
],
"headers": [
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h",
"test/core/end2end/cq_verifier.h",
- "test/core/end2end/fake_resolver.h",
"test/core/end2end/fixtures/http_proxy_fixture.h",
"test/core/end2end/fixtures/proxy.h",
"test/core/iomgr/endpoint_tests.h",
@@ -8684,10 +8707,10 @@
"language": "c",
"name": "grpc_test_util_base",
"src": [
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c",
+ "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h",
"test/core/end2end/cq_verifier.c",
"test/core/end2end/cq_verifier.h",
- "test/core/end2end/fake_resolver.c",
- "test/core/end2end/fake_resolver.h",
"test/core/end2end/fixtures/http_proxy_fixture.c",
"test/core/end2end/fixtures/http_proxy_fixture.h",
"test/core/end2end/fixtures/proxy.c",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 1242025fc2..337cce829e 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3142,6 +3142,32 @@
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
+ "excluded_poll_engines": [
+ "poll",
+ "poll-cv"
+ ],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "client_lb_end2end_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
@@ -3452,6 +3478,10 @@
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
+ "excluded_poll_engines": [
+ "poll",
+ "poll-cv"
+ ],
"flaky": false,
"gtest": true,
"language": "c++",
@@ -3474,6 +3504,10 @@
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
+ "excluded_poll_engines": [
+ "poll",
+ "poll-cv"
+ ],
"flaky": false,
"gtest": false,
"language": "c++",
@@ -3693,28 +3727,6 @@
"flaky": false,
"gtest": true,
"language": "c++",
- "name": "round_robin_end2end_test",
- "platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ]
- },
- {
- "args": [],
- "ci_platforms": [
- "linux",
- "mac",
- "posix",
- "windows"
- ],
- "cpu_cost": 1.0,
- "exclude_configs": [],
- "exclude_iomgrs": [],
- "flaky": false,
- "gtest": true,
- "language": "c++",
"name": "secure_auth_context_test",
"platforms": [
"linux",
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 1303366574..d4bdaa233e 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -493,6 +493,7 @@
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_common.h" />
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_decode.h" />
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" />
@@ -961,6 +962,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\pick_first\pick_first.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\round_robin\round_robin.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index 9f25a1c179..571837c3fd 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -661,6 +661,9 @@
<ClCompile Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.c">
<Filter>third_party\nanopb</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\pick_first\pick_first.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\pick_first</Filter>
</ClCompile>
@@ -1424,6 +1427,9 @@
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.h">
<Filter>third_party\nanopb</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h">
<Filter>src\core\ext\filters\client_channel\resolver\dns\c_ares</Filter>
</ClInclude>
@@ -1577,6 +1583,9 @@
<Filter Include="src\core\ext\filters\client_channel\resolver\dns\native">
<UniqueIdentifier>{9b2d7e1f-b78a-2e7a-3000-944e46a5fab9}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver\fake">
+ <UniqueIdentifier>{e75d1482-9a43-5fdf-03a5-e2b2833715fb}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\core\ext\filters\client_channel\resolver\sockaddr">
<UniqueIdentifier>{bd317dd5-323e-5b27-4c05-d85786be36ab}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
index 061fca6d28..a0890196bd 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
@@ -181,8 +181,8 @@
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\data\ssl_test_data.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\security\oauth2_utils.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h" />
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\proxy.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\iomgr\endpoint_tests.h" />
@@ -321,9 +321,9 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\security\oauth2_utils.c">
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c">
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c">
</ClCompile>
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
index 35d0ea0cfd..d92688a9a3 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
@@ -16,10 +16,10 @@
<ClCompile Include="$(SolutionDir)\..\test\core\security\oauth2_utils.c">
<Filter>test\core\security</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
- <Filter>test\core\end2end</Filter>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c">
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
<Filter>test\core\end2end</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c">
@@ -542,10 +542,10 @@
<ClInclude Include="$(SolutionDir)\..\test\core\security\oauth2_utils.h">
<Filter>test\core\security</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h">
- <Filter>test\core\end2end</Filter>
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h">
+ <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h">
<Filter>test\core\end2end</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h">
@@ -950,6 +950,21 @@
<Filter Include="src\core">
<UniqueIdentifier>{f7bfac91-5eb2-dea7-4601-6c63edbbf997}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\core\ext">
+ <UniqueIdentifier>{5db70e06-741d-708c-bf0a-b59f8ca1f8bd}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters">
+ <UniqueIdentifier>{f0f88514-c2d8-c4c9-c3bd-591682207751}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel">
+ <UniqueIdentifier>{5bb60a9e-156f-e1c8-3b9c-1b23e7992d7a}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver">
+ <UniqueIdentifier>{24a50975-435e-20a5-b0f2-71bc330d0378}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver\fake">
+ <UniqueIdentifier>{9e94ffec-fe00-d132-db50-c4a3c218f102}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\core\lib">
<UniqueIdentifier>{f4e8c61e-1ca6-0fdd-7b5e-b7f9a30c9a21}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
index af13acef45..d9977f7c67 100644
--- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
@@ -147,8 +147,8 @@
</ItemDefinitionGroup>
<ItemGroup>
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h" />
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\proxy.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\iomgr\endpoint_tests.h" />
@@ -164,9 +164,9 @@
<ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h" />
</ItemGroup>
<ItemGroup>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c">
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c">
</ClCompile>
diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
index 4da043ea90..0a0590e9be 100644
--- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
@@ -1,10 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
- <Filter>test\core\end2end</Filter>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
</ClCompile>
- <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c">
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
<Filter>test\core\end2end</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c">
@@ -48,10 +48,10 @@
</ClCompile>
</ItemGroup>
<ItemGroup>
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h">
- <Filter>test\core\end2end</Filter>
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
</ClInclude>
- <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h">
+ <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h">
<Filter>test\core\end2end</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h">
@@ -96,6 +96,27 @@
</ItemGroup>
<ItemGroup>
+ <Filter Include="src">
+ <UniqueIdentifier>{65483377-42fd-137e-3847-00dfd4675db3}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core">
+ <UniqueIdentifier>{51a516dc-93e3-4dd5-d114-2d06f5df4ad7}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext">
+ <UniqueIdentifier>{a927155d-bcf6-0dd8-6d63-be48bcaf617f}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters">
+ <UniqueIdentifier>{df16e935-149b-79bf-ecb3-dc3a6b628082}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel">
+ <UniqueIdentifier>{0fb7c1f0-5e3a-d1df-4c9d-96a677a7f3ee}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver">
+ <UniqueIdentifier>{f47477d5-cb4e-e726-04dd-182151e81c71}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver\fake">
+ <UniqueIdentifier>{2d280bd0-f4ee-d1f2-4d70-174147ac0dbc}</UniqueIdentifier>
+ </Filter>
<Filter Include="test">
<UniqueIdentifier>{037c7645-1698-cf2d-4163-525240323101}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index ac403a7c48..76770b0420 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -450,6 +450,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\deadline\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" />
@@ -860,6 +861,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\sockaddr\sockaddr_resolver.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c">
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 9fee2ec22b..92d9e37b3a 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -562,6 +562,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\sockaddr\sockaddr_resolver.c">
<Filter>src\core\ext\filters\client_channel\resolver\sockaddr</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.c">
<Filter>src\core\ext\filters\load_reporting</Filter>
</ClCompile>
@@ -1235,6 +1238,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h">
<Filter>src\core\ext\filters\client_channel\resolver\dns\c_ares</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h">
+ <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h">
<Filter>src\core\ext\filters\load_reporting</Filter>
</ClInclude>
@@ -1412,6 +1418,9 @@
<Filter Include="src\core\ext\filters\client_channel\resolver\dns\native">
<UniqueIdentifier>{55f499bd-ae18-5210-81e1-385c85e60875}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\core\ext\filters\client_channel\resolver\fake">
+ <UniqueIdentifier>{7f924133-4a98-87b0-f158-cb64ea91e71a}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\core\ext\filters\client_channel\resolver\sockaddr">
<UniqueIdentifier>{99210f5e-b2a0-ecd1-024f-fc152db68a11}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj
index 55e16f188d..ff02417561 100644
--- a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj
+++ b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj
@@ -20,7 +20,7 @@
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
- <ProjectGuid>{54B15DF6-42BA-5347-C9B8-2D7F1F2921C6}</ProjectGuid>
+ <ProjectGuid>{903C0B70-3C11-181F-F532-EA3F352BDABB}</ProjectGuid>
<IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
<IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
</PropertyGroup>
@@ -62,14 +62,14 @@
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)'=='Debug'">
- <TargetName>round_robin_end2end_test</TargetName>
+ <TargetName>client_lb_end2end_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
<Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
<Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'">
- <TargetName>round_robin_end2end_test</TargetName>
+ <TargetName>client_lb_end2end_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
<Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
@@ -160,7 +160,7 @@
</ItemDefinitionGroup>
<ItemGroup>
- <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\round_robin_end2end_test.cc">
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\client_lb_end2end_test.cc">
</ClCompile>
</ItemGroup>
<ItemGroup>
diff --git a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj.filters b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj.filters
index 95a149953f..19c421a754 100644
--- a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj.filters
+++ b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj.filters
@@ -1,20 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
- <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\round_robin_end2end_test.cc">
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\client_lb_end2end_test.cc">
<Filter>test\cpp\end2end</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="test">
- <UniqueIdentifier>{e151f47d-6563-5ef9-fae6-70708f9f8ee6}</UniqueIdentifier>
+ <UniqueIdentifier>{5e3b0af8-1e9d-7d57-78f8-b695d18d56d2}</UniqueIdentifier>
</Filter>
<Filter Include="test\cpp">
- <UniqueIdentifier>{07958594-fd93-28f7-9388-c67c952701b8}</UniqueIdentifier>
+ <UniqueIdentifier>{b82591f8-69bc-ad7c-242a-63ef38a19c51}</UniqueIdentifier>
</Filter>
<Filter Include="test\cpp\end2end">
- <UniqueIdentifier>{7596a0dd-caa4-b365-a59f-f7ffef38b10d}</UniqueIdentifier>
+ <UniqueIdentifier>{33186a45-2f2e-0c6b-9b14-c1f096a81ac9}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>