diff options
62 files changed, 2575 insertions, 854 deletions
@@ -921,6 +921,7 @@ grpc_cc_library( deps = [ "grpc_base", "grpc_client_channel", + "grpc_resolver_fake", ], ) @@ -950,6 +951,7 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_secure", + "grpc_resolver_fake", ], ) @@ -1039,6 +1041,18 @@ grpc_cc_library( ) grpc_cc_library( + name = "grpc_resolver_fake", + srcs = ["src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c"], + hdrs = ["src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"], + visibility = ["//test:__subpackages__"], + language = "c", + deps = [ + "grpc_base", + "grpc_client_channel", + ], +) + +grpc_cc_library( name = "grpc_secure", srcs = [ "src/core/lib/http/httpcli_security_connector.c", diff --git a/CMakeLists.txt b/CMakeLists.txt index 11c6b04788..faf8a683d0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -666,6 +666,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx client_crash_test) endif() add_dependencies(buildtests_cxx client_crash_test_server) +add_dependencies(buildtests_cxx client_lb_end2end_test) add_dependencies(buildtests_cxx codegen_test_full) add_dependencies(buildtests_cxx codegen_test_minimal) add_dependencies(buildtests_cxx credentials_test) @@ -716,7 +717,6 @@ endif() add_dependencies(buildtests_cxx qps_worker) add_dependencies(buildtests_cxx reconnect_interop_client) add_dependencies(buildtests_cxx reconnect_interop_server) -add_dependencies(buildtests_cxx round_robin_end2end_test) add_dependencies(buildtests_cxx secure_auth_context_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx secure_sync_unary_ping_pong_test) @@ -1142,6 +1142,7 @@ add_library(grpc third_party/nanopb/pb_common.c third_party/nanopb/pb_decode.c third_party/nanopb/pb_encode.c + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -1566,8 +1567,8 @@ add_library(grpc_test_util test/core/end2end/data/server1_key.c test/core/end2end/data/test_root_cert.c test/core/security/oauth2_utils.c + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c test/core/end2end/cq_verifier.c - test/core/end2end/fake_resolver.c test/core/end2end/fixtures/http_proxy_fixture.c test/core/end2end/fixtures/proxy.c test/core/iomgr/endpoint_tests.c @@ -1787,8 +1788,8 @@ endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) add_library(grpc_test_util_unsecure + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c test/core/end2end/cq_verifier.c - test/core/end2end/fake_resolver.c test/core/end2end/fixtures/http_proxy_fixture.c test/core/end2end/fixtures/proxy.c test/core/iomgr/endpoint_tests.c @@ -2030,6 +2031,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c src/core/ext/filters/load_reporting/load_reporting.c src/core/ext/filters/load_reporting/load_reporting_filter.c src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -9666,6 +9668,48 @@ target_link_libraries(client_crash_test_server endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(client_lb_end2end_test + test/cpp/end2end/client_lb_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(client_lb_end2end_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CARES_BUILD_INCLUDE_DIR} + PRIVATE ${CARES_INCLUDE_DIR} + PRIVATE ${CARES_PLATFORM_INCLUDE_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(client_lb_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(codegen_test_full ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/control.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/control.grpc.pb.cc @@ -11608,48 +11652,6 @@ target_link_libraries(reconnect_interop_server endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) -add_executable(round_robin_end2end_test - test/cpp/end2end/round_robin_end2end_test.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - - -target_include_directories(round_robin_end2end_test - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include - PRIVATE ${BORINGSSL_ROOT_DIR}/include - PRIVATE ${PROTOBUF_ROOT_DIR}/src - PRIVATE ${BENCHMARK_ROOT_DIR}/include - PRIVATE ${ZLIB_ROOT_DIR} - PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib - PRIVATE ${CARES_BUILD_INCLUDE_DIR} - PRIVATE ${CARES_INCLUDE_DIR} - PRIVATE ${CARES_PLATFORM_INCLUDE_DIR} - PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares - PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include - PRIVATE third_party/googletest/googletest/include - PRIVATE third_party/googletest/googletest - PRIVATE third_party/googletest/googlemock/include - PRIVATE third_party/googletest/googlemock - PRIVATE ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(round_robin_end2end_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc++_test_util - grpc_test_util - grpc++ - grpc - gpr_test_util - gpr - ${_gRPC_GFLAGS_LIBRARIES} -) - -endif (gRPC_BUILD_TESTS) -if (gRPC_BUILD_TESTS) - add_executable(secure_auth_context_test test/cpp/common/secure_auth_context_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -1127,6 +1127,7 @@ channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server +client_lb_end2end_test: $(BINDIR)/$(CONFIG)/client_lb_end2end_test codegen_test_full: $(BINDIR)/$(CONFIG)/codegen_test_full codegen_test_minimal: $(BINDIR)/$(CONFIG)/codegen_test_minimal credentials_test: $(BINDIR)/$(CONFIG)/credentials_test @@ -1170,7 +1171,6 @@ qps_openloop_test: $(BINDIR)/$(CONFIG)/qps_openloop_test qps_worker: $(BINDIR)/$(CONFIG)/qps_worker reconnect_interop_client: $(BINDIR)/$(CONFIG)/reconnect_interop_client reconnect_interop_server: $(BINDIR)/$(CONFIG)/reconnect_interop_server -round_robin_end2end_test: $(BINDIR)/$(CONFIG)/round_robin_end2end_test secure_auth_context_test: $(BINDIR)/$(CONFIG)/secure_auth_context_test secure_sync_unary_ping_pong_test: $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test server_builder_plugin_test: $(BINDIR)/$(CONFIG)/server_builder_plugin_test @@ -1565,6 +1565,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cli_call_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ $(BINDIR)/$(CONFIG)/client_crash_test_server \ + $(BINDIR)/$(CONFIG)/client_lb_end2end_test \ $(BINDIR)/$(CONFIG)/codegen_test_full \ $(BINDIR)/$(CONFIG)/codegen_test_minimal \ $(BINDIR)/$(CONFIG)/credentials_test \ @@ -1601,7 +1602,6 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/qps_worker \ $(BINDIR)/$(CONFIG)/reconnect_interop_client \ $(BINDIR)/$(CONFIG)/reconnect_interop_server \ - $(BINDIR)/$(CONFIG)/round_robin_end2end_test \ $(BINDIR)/$(CONFIG)/secure_auth_context_test \ $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test \ $(BINDIR)/$(CONFIG)/server_builder_plugin_test \ @@ -1687,6 +1687,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cli_call_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ $(BINDIR)/$(CONFIG)/client_crash_test_server \ + $(BINDIR)/$(CONFIG)/client_lb_end2end_test \ $(BINDIR)/$(CONFIG)/codegen_test_full \ $(BINDIR)/$(CONFIG)/codegen_test_minimal \ $(BINDIR)/$(CONFIG)/credentials_test \ @@ -1723,7 +1724,6 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/qps_worker \ $(BINDIR)/$(CONFIG)/reconnect_interop_client \ $(BINDIR)/$(CONFIG)/reconnect_interop_server \ - $(BINDIR)/$(CONFIG)/round_robin_end2end_test \ $(BINDIR)/$(CONFIG)/secure_auth_context_test \ $(BINDIR)/$(CONFIG)/secure_sync_unary_ping_pong_test \ $(BINDIR)/$(CONFIG)/server_builder_plugin_test \ @@ -2061,6 +2061,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 ) $(E) "[RUN] Testing client_crash_test" $(Q) $(BINDIR)/$(CONFIG)/client_crash_test || ( echo test client_crash_test failed ; exit 1 ) + $(E) "[RUN] Testing client_lb_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/client_lb_end2end_test || ( echo test client_lb_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing codegen_test_full" $(Q) $(BINDIR)/$(CONFIG)/codegen_test_full || ( echo test codegen_test_full failed ; exit 1 ) $(E) "[RUN] Testing codegen_test_minimal" @@ -2109,8 +2111,6 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/proto_utils_test || ( echo test proto_utils_test failed ; exit 1 ) $(E) "[RUN] Testing qps_openloop_test" $(Q) $(BINDIR)/$(CONFIG)/qps_openloop_test || ( echo test qps_openloop_test failed ; exit 1 ) - $(E) "[RUN] Testing round_robin_end2end_test" - $(Q) $(BINDIR)/$(CONFIG)/round_robin_end2end_test || ( echo test round_robin_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing secure_auth_context_test" $(Q) $(BINDIR)/$(CONFIG)/secure_auth_context_test || ( echo test secure_auth_context_test failed ; exit 1 ) $(E) "[RUN] Testing secure_sync_unary_ping_pong_test" @@ -3125,6 +3125,7 @@ LIBGRPC_SRC = \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_encode.c \ + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c \ @@ -3546,8 +3547,8 @@ LIBGRPC_TEST_UTIL_SRC = \ test/core/end2end/data/server1_key.c \ test/core/end2end/data/test_root_cert.c \ test/core/security/oauth2_utils.c \ + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ test/core/end2end/cq_verifier.c \ - test/core/end2end/fake_resolver.c \ test/core/end2end/fixtures/http_proxy_fixture.c \ test/core/end2end/fixtures/proxy.c \ test/core/iomgr/endpoint_tests.c \ @@ -3756,8 +3757,8 @@ endif LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ test/core/end2end/cq_verifier.c \ - test/core/end2end/fake_resolver.c \ test/core/end2end/fixtures/http_proxy_fixture.c \ test/core/end2end/fixtures/proxy.c \ test/core/iomgr/endpoint_tests.c \ @@ -3982,6 +3983,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ src/core/ext/filters/load_reporting/load_reporting.c \ src/core/ext/filters/load_reporting/load_reporting_filter.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ @@ -14074,6 +14076,49 @@ endif endif +CLIENT_LB_END2END_TEST_SRC = \ + test/cpp/end2end/client_lb_end2end_test.cc \ + +CLIENT_LB_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_LB_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/client_lb_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/client_lb_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/client_lb_end2end_test: $(PROTOBUF_DEP) $(CLIENT_LB_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(CLIENT_LB_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_lb_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_lb_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_client_lb_end2end_test: $(CLIENT_LB_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(CLIENT_LB_END2END_TEST_OBJS:.o=.dep) +endif +endif + + CODEGEN_TEST_FULL_SRC = \ $(GENDIR)/src/proto/grpc/testing/control.pb.cc $(GENDIR)/src/proto/grpc/testing/control.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \ @@ -15891,49 +15936,6 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/interop/reconnect_interop_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc -ROUND_ROBIN_END2END_TEST_SRC = \ - test/cpp/end2end/round_robin_end2end_test.cc \ - -ROUND_ROBIN_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ROUND_ROBIN_END2END_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/round_robin_end2end_test: openssl_dep_error - -else - - - - -ifeq ($(NO_PROTOBUF),true) - -# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. - -$(BINDIR)/$(CONFIG)/round_robin_end2end_test: protobuf_dep_error - -else - -$(BINDIR)/$(CONFIG)/round_robin_end2end_test: $(PROTOBUF_DEP) $(ROUND_ROBIN_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(ROUND_ROBIN_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/round_robin_end2end_test - -endif - -endif - -$(OBJDIR)/$(CONFIG)/test/cpp/end2end/round_robin_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_round_robin_end2end_test: $(ROUND_ROBIN_END2END_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(ROUND_ROBIN_END2END_TEST_OBJS:.o=.dep) -endif -endif - - SECURE_AUTH_CONTEXT_TEST_SRC = \ test/cpp/common/secure_auth_context_test.cc \ diff --git a/binding.gyp b/binding.gyp index e617485134..130f7cc169 100644 --- a/binding.gyp +++ b/binding.gyp @@ -873,6 +873,7 @@ 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', + 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c', diff --git a/build.yaml b/build.yaml index d1e61256e6..c3d0d2364d 100644 --- a/build.yaml +++ b/build.yaml @@ -523,6 +523,7 @@ filegroups: - grpc_base - grpc_client_channel - nanopb + - grpc_resolver_fake - name: grpc_lb_policy_grpclb_secure headers: - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -544,6 +545,7 @@ filegroups: - grpc_secure - grpc_client_channel - nanopb + - grpc_resolver_fake - name: grpc_lb_policy_pick_first src: - src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -603,6 +605,15 @@ filegroups: uses: - grpc_base - grpc_client_channel +- name: grpc_resolver_fake + headers: + - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h + src: + - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c + plugin: grpc_resolver_fake + uses: + - grpc_base + - grpc_client_channel - name: grpc_resolver_sockaddr src: - src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -673,8 +684,8 @@ filegroups: - name: grpc_test_util_base build: test headers: + - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h - test/core/end2end/cq_verifier.h - - test/core/end2end/fake_resolver.h - test/core/end2end/fixtures/http_proxy_fixture.h - test/core/end2end/fixtures/proxy.h - test/core/iomgr/endpoint_tests.h @@ -689,8 +700,8 @@ filegroups: - test/core/util/slice_splitter.h - test/core/util/trickle_endpoint.h src: + - src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c - test/core/end2end/cq_verifier.c - - test/core/end2end/fake_resolver.c - test/core/end2end/fixtures/http_proxy_fixture.c - test/core/end2end/fixtures/proxy.c - test/core/iomgr/endpoint_tests.c @@ -1069,6 +1080,7 @@ libs: - grpc_resolver_dns_ares - grpc_resolver_dns_native - grpc_resolver_sockaddr + - grpc_resolver_fake - grpc_load_reporting - grpc_secure - census @@ -1168,6 +1180,7 @@ libs: - grpc_resolver_dns_ares - grpc_resolver_dns_native - grpc_resolver_sockaddr + - grpc_resolver_fake - grpc_load_reporting - grpc_lb_policy_grpclb - grpc_lb_policy_pick_first @@ -3625,6 +3638,22 @@ targets: - grpc - gpr_test_util - gpr +- name: client_lb_end2end_test + gtest: true + build: test + language: c++ + src: + - test/cpp/end2end/client_lb_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + excluded_poll_engines: + - poll + - poll-cv - name: codegen_test_full gtest: true build: test @@ -3903,6 +3932,9 @@ targets: - grpc - gpr_test_util - gpr + excluded_poll_engines: + - poll + - poll-cv - name: grpclb_test gtest: false build: test @@ -3917,6 +3949,9 @@ targets: - grpc - gpr_test_util - gpr + excluded_poll_engines: + - poll + - poll-cv - name: health_service_end2end_test gtest: true build: test @@ -4216,19 +4251,6 @@ targets: - gpr_test_util - gpr - grpc++_test_config -- name: round_robin_end2end_test - gtest: true - build: test - language: c++ - src: - - test/cpp/end2end/round_robin_end2end_test.cc - deps: - - grpc++_test_util - - grpc_test_util - - grpc++ - - grpc - - gpr_test_util - - gpr - name: secure_auth_context_test gtest: true build: test @@ -305,6 +305,7 @@ if test "$PHP_GRPC" != "no"; then third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_encode.c \ + src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c \ src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c \ @@ -654,6 +655,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/c_ares) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/dns/native) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/fake) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/resolver/sockaddr) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/deadline) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http) diff --git a/config.w32 b/config.w32 index 7c407e848a..3fa30f1b53 100644 --- a/config.w32 +++ b/config.w32 @@ -282,6 +282,7 @@ if (PHP_GRPC != "no") { "third_party\\nanopb\\pb_common.c " + "third_party\\nanopb\\pb_decode.c " + "third_party\\nanopb\\pb_encode.c " + + "src\\core\\ext\\filters\\client_channel\\resolver\\fake\\fake_resolver.c " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\pick_first\\pick_first.c " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\round_robin\\round_robin.c " + "src\\core\\ext\\filters\\client_channel\\resolver\\dns\\c_ares\\dns_resolver_ares.c " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index e5e61df477..56cd8d2ee9 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -452,6 +452,7 @@ Pod::Spec.new do |s| 'third_party/nanopb/pb_common.h', 'third_party/nanopb/pb_decode.h', 'third_party/nanopb/pb_encode.h', + 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', 'src/core/ext/filters/load_reporting/load_reporting.h', @@ -698,6 +699,7 @@ Pod::Spec.new do |s| 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', + 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c', @@ -938,6 +940,7 @@ Pod::Spec.new do |s| 'third_party/nanopb/pb_common.h', 'third_party/nanopb/pb_decode.h', 'third_party/nanopb/pb_encode.h', + 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h', 'src/core/ext/filters/load_reporting/load_reporting.h', diff --git a/grpc.gemspec b/grpc.gemspec index b334efb0b5..9ec5589d8e 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -368,6 +368,7 @@ Gem::Specification.new do |s| s.files += %w( third_party/nanopb/pb_common.h ) s.files += %w( third_party/nanopb/pb_decode.h ) s.files += %w( third_party/nanopb/pb_encode.h ) + s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h ) s.files += %w( src/core/ext/filters/load_reporting/load_reporting.h ) @@ -614,6 +615,7 @@ Gem::Specification.new do |s| s.files += %w( third_party/nanopb/pb_common.c ) s.files += %w( third_party/nanopb/pb_decode.c ) s.files += %w( third_party/nanopb/pb_encode.c ) + s.files += %w( src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c ) s.files += %w( src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c ) diff --git a/include/grpc/support/workaround_list.h b/include/grpc/support/workaround_list.h index ec4766510f..34011a7c9c 100644 --- a/include/grpc/support/workaround_list.h +++ b/include/grpc/support/workaround_list.h @@ -43,4 +43,4 @@ typedef enum { GRPC_MAX_WORKAROUND_ID } grpc_workaround_list; -#endif +#endif /* GRPC_SUPPORT_WORKAROUND_LIST_H */ diff --git a/package.xml b/package.xml index 817a0345d6..d37b9030f9 100644 --- a/package.xml +++ b/package.xml @@ -382,6 +382,7 @@ <file baseinstalldir="/" name="third_party/nanopb/pb_common.h" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_decode.h" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_encode.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/load_reporting/load_reporting.h" role="src" /> @@ -628,6 +629,7 @@ <file baseinstalldir="/" name="third_party/nanopb/pb_common.c" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_decode.c" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_encode.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c" role="src" /> diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 04666edbec..aa464f3ab7 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -126,9 +126,10 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } else { grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq), NULL, - &w->on_complete, NULL); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL, + &w->on_complete, NULL); } gpr_mu_lock(&w->mu); @@ -245,7 +246,8 @@ void grpc_channel_watch_connectivity_state( if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); grpc_client_channel_watch_connectivity_state( - &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state, &w->on_complete, &w->watcher_timer_init); } else { abort(); diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 8cebbe9eca..ab71467d73 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -389,7 +389,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; - grpc_lb_policy *old_lb_policy; + grpc_lb_policy *old_lb_policy = NULL; grpc_slice_hash_table *method_params_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; @@ -399,6 +399,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); + bool lb_policy_updated = false; if (chand->resolver_result != NULL) { // Find LB policy name. const grpc_arg *channel_arg = @@ -438,14 +439,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.combiner = chand->combiner; - lb_policy = - grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); - if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "config_change"); - GRPC_ERROR_UNREF(state_error); - state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, - &state_error); + + const bool lb_policy_type_changed = + (chand->info_lb_policy_name == NULL) || + (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0); + if (chand->lb_policy != NULL && !lb_policy_type_changed) { + // update + lb_policy_updated = true; + grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); + } else { + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + GRPC_ERROR_UNREF(state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, + &state_error); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + } } + // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); @@ -492,8 +506,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, gpr_free(chand->info_lb_policy_name); chand->info_lb_policy_name = lb_policy_name; } - old_lb_policy = chand->lb_policy; - chand->lb_policy = lb_policy; if (service_config_json != NULL) { gpr_free(chand->info_service_config_json); chand->info_service_config_json = service_config_json; @@ -516,17 +528,21 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, "Channel disconnected", &error, 1)); grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + if (!lb_policy_updated && lb_policy != NULL && + chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); exit_idle = true; chand->exit_idle_when_lb_policy_arrives = false; } if (error == GRPC_ERROR_NONE && chand->resolver) { - set_channel_connectivity_state_locked( - exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); - if (lb_policy != NULL) { - watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + if (!lb_policy_updated) { + set_channel_connectivity_state_locked(exec_ctx, chand, state, + GRPC_ERROR_REF(state_error), + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + } } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next_locked(exec_ctx, chand->resolver, @@ -546,7 +562,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, "resolver_gone"); } - if (exit_idle) { + if (!lb_policy_updated && lb_policy != NULL && exit_idle) { grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } @@ -555,9 +571,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_pollset_set_del_pollset_set( exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + old_lb_policy = NULL; } - if (lb_policy != NULL) { + if (!lb_policy_updated && lb_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } @@ -1447,7 +1464,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( typedef struct external_connectivity_watcher { channel_data *chand; - grpc_pollset *pollset; + grpc_polling_entity pollent; grpc_closure *on_complete; grpc_closure *watcher_timer_init; grpc_connectivity_state *state; @@ -1522,8 +1539,8 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, - w->pollset); + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); @@ -1550,8 +1567,8 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_notify_on_state_change( exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); } - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, - w->pollset); + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); @@ -1559,18 +1576,18 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *closure, - grpc_closure *watcher_timer_init) { + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_polling_entity pollent, grpc_connectivity_state *state, + grpc_closure *closure, grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; - w->pollset = pollset; + w->pollent = pollent; w->on_complete = closure; w->state = state; w->watcher_timer_init = watcher_timer_init; - - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); + grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent, + chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); grpc_closure_sched( diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 356a7ab0c1..4f8987f0da 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -57,9 +57,9 @@ int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element *elem); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete, - grpc_closure *watcher_timer_init); + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_polling_entity pollent, grpc_connectivity_state *state, + grpc_closure *on_complete, grpc_closure *watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 112ba40658..7ac2cb1775 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -166,3 +166,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( return policy->vtable->check_connectivity_locked(exec_ctx, policy, connectivity_error); } + +void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + const grpc_lb_policy_args *lb_policy_args) { + policy->vtable->update_locked(exec_ctx, policy, lb_policy_args); +} diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index fb4aa084a6..07e7953009 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -42,6 +42,7 @@ is expected to be extended to contain some parameters) */ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; +typedef struct grpc_lb_policy_args grpc_lb_policy_args; struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; @@ -105,9 +106,12 @@ struct grpc_lb_policy_vtable { grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_closure *closure); + + void (*update_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args); }; -/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ +//#define GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG /* Strong references: the policy will shutdown when they reach zero */ @@ -207,4 +211,9 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); +/** Update \a policy with \a lb_policy_args. */ +void grpc_lb_policy_update_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + const grpc_lb_policy_args *lb_policy_args); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index d2a2856a18..6eb0a9ef21 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -115,6 +115,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -315,6 +316,9 @@ typedef struct glb_lb_policy { /** for communicating with the LB server */ grpc_channel *lb_channel; + /** response generator to inject address updates into \a lb_channel */ + grpc_fake_resolver_response_generator *response_generator; + /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy *rr_policy; @@ -323,6 +327,9 @@ typedef struct glb_lb_policy { /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; + /** connectivity state of the LB channel */ + grpc_connectivity_state lb_channel_connectivity; + /** stores the deserialized response from the LB. May be NULL until one such * response has arrived. */ grpc_grpclb_serverlist *serverlist; @@ -340,10 +347,27 @@ typedef struct glb_lb_policy { bool shutting_down; + /** are we currently updating lb_call? */ + bool updating_lb_call; + + /** are we currently updating lb_channel? */ + bool updating_lb_channel; + + /** are we already watching the LB channel's connectivity? */ + bool watching_lb_channel; + + /** is \a lb_call_retry_timer active? */ + bool retry_timer_active; + + /** called upon changes to the LB channel's connectivity. */ + grpc_closure lb_channel_on_connectivity_changed; + + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; + /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* Finished sending initial request. */ grpc_closure lb_on_sent_initial_request; @@ -533,10 +557,9 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } -/* returns true if the new RR policy should replace the current one, if any */ -static bool update_lb_connectivity_status_locked( +static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { + grpc_connectivity_state rr_state, grpc_error *rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); @@ -570,28 +593,26 @@ static bool update_lb_connectivity_status_locked( * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (new_rr_state) { + switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: - GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE); - return false; /* don't replace the RR policy */ + GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + break; case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: - GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); + GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Setting grpclb's state to %s from new RR policy %p state.", - grpc_connectivity_state_name(new_rr_state), - (void *)glb_policy->rr_policy); + gpr_log( + GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", + grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - new_rr_state, GRPC_ERROR_REF(new_rr_state_error), + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, + GRPC_ERROR_REF(rr_state_error), "update_lb_connectivity_status_locked"); - return true; } /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return @@ -670,45 +691,38 @@ static bool pick_from_internal_rr_locked( return pick_done; } -static grpc_lb_policy *create_rr_locked( - grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - - grpc_lb_policy_args args; - memset(&args, 0, sizeof(args)); - args.client_channel_factory = glb_policy->cc_factory; - args.combiner = glb_policy->base.combiner; +static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); + args->client_channel_factory = glb_policy->cc_factory; + args->combiner = glb_policy->base.combiner; grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, serverlist); - + process_serverlist_locked(exec_ctx, glb_policy->serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); - args.args = grpc_channel_args_copy_and_add_and_remove( + args->args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); - - grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - GPR_ASSERT(rr != NULL); grpc_lb_addresses_destroy(exec_ctx, addresses); - grpc_channel_args_destroy(exec_ctx, args.args); - return rr; + return args; +} + +static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_args *args) { + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -/* glb_policy->rr_policy may be NULL (initial handover) */ -static void rr_handover_locked(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { - GPR_ASSERT(glb_policy->serverlist != NULL && - glb_policy->serverlist->num_servers > 0); - - if (glb_policy->shutting_down) return; +static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + grpc_lb_policy_args *args) { + GPR_ASSERT(glb_policy->rr_policy == NULL); grpc_lb_policy *new_rr_policy = - create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); + grpc_lb_policy_create(exec_ctx, "round_robin", args); if (new_rr_policy == NULL) { gpr_log(GPR_ERROR, "Failure creating a RoundRobin policy for serverlist update with " @@ -719,41 +733,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy->rr_policy); return; } - - grpc_error *new_rr_state_error = NULL; - const grpc_connectivity_state new_rr_state = - grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy, - &new_rr_state_error); - /* Connectivity state is a function of the new RR policy just created */ - const bool replace_old_rr = update_lb_connectivity_status_locked( - exec_ctx, glb_policy, new_rr_state, new_rr_state_error); - - if (!replace_old_rr) { - /* dispose of the new RR policy that won't be used after all */ - GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Keeping old RR policy (%p) despite new serverlist: new RR " - "policy was in %s connectivity state.", - (void *)glb_policy->rr_policy, - grpc_connectivity_state_name(new_rr_state)); - } - return; - } - - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", - (void *)new_rr_policy, (void *)glb_policy->rr_policy); - } - - if (glb_policy->rr_policy != NULL) { - /* if we are phasing out an existing RR instance, unref it. */ - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); - } - - /* Finally update the RR policy to the newly created one */ glb_policy->rr_policy = new_rr_policy; + grpc_error *rr_state_error = NULL; + const grpc_connectivity_state rr_state = + grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy, + &rr_state_error); + /* Connectivity state is a function of the RR policy updated/created */ + update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, + rr_state_error); + /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ @@ -769,10 +758,10 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner, false)); rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = new_rr_state; + rr_connectivity->state = rr_state; /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched"); grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, &rr_connectivity->on_change); @@ -809,6 +798,31 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, } } +/* glb_policy->rr_policy may be NULL (initial handover) */ +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); + + if (glb_policy->shutting_down) return; + + grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + if (glb_policy->rr_policy != NULL) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args); + } else { + create_rr_locked(exec_ctx, glb_policy, args); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)", + (void *)glb_policy->rr_policy); + } + } + lb_policy_args_destroy(exec_ctx, args); +} + static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; @@ -854,18 +868,24 @@ static grpc_slice_hash_table_entry targets_info_entry_create( return entry; } -/* Returns the target URI for the LB service whose addresses are in \a - * addresses. Using this URI, a bidirectional streaming channel will be created - * for the reception of load balancing updates. +static int balancer_name_cmp_fn(void *a, void *b) { + const char *a_str = a; + const char *b_str = b; + return strcmp(a_str, b_str); +} + +/* Returns the channel args for the LB channel, used to create a bidirectional + * stream for the reception of load balancing updates. * - * The output argument \a targets_info will be updated to contain a mapping of - * "LB server address" to "balancer name", as reported by the naming system. - * This mapping will be propagated via the channel arguments of the - * aforementioned LB streaming channel, to be used by the security connector for - * secure naming checks. The user is responsible for freeing \a targets_info. */ -static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, - const grpc_lb_addresses *addresses, - grpc_slice_hash_table **targets_info) { + * Inputs: + * - \a addresses: corresponding to the balancers. + * - \a response_generator: in order to propagate updates from the resolver + * above the grpclb policy. + * - \a args: other args inherited from the grpclb policy. */ +static grpc_channel_args *build_lb_channel_args( + grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; @@ -874,53 +894,54 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, * It's the resolver's responsibility to make sure this policy is only * instantiated and used in that case. Otherwise, something has gone wrong. */ GPR_ASSERT(num_grpclb_addrs > 0); - + grpc_lb_addresses *lb_addresses = + grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); - /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a - * addresses */ - /* TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); - size_t addr_index = 0; - - for (size_t i = 0; i < addresses->num_addresses; i++) { + size_t lb_addresses_idx = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); } - if (addresses->addresses[i].is_balancer) { - char *addr_str; - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_str, &addresses->addresses[i].address, true) > 0); - targets_info_entries[addr_index] = targets_info_entry_create( - addr_str, addresses->addresses[i].balancer_name); - addr_strs[addr_index++] = addr_str; - } + char *addr_str; + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_str, &addresses->addresses[i].address, true) > 0); + targets_info_entries[lb_addresses_idx] = targets_info_entry_create( + addr_str, addresses->addresses[i].balancer_name); + gpr_free(addr_str); + + grpc_lb_addresses_set_address( + lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, + addresses->addresses[i].address.len, false /* is balancer */, + addresses->addresses[i].balancer_name, NULL /* user data */); } - GPR_ASSERT(addr_index == num_grpclb_addrs); - - size_t uri_path_len; - char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs, - ",", &uri_path_len); - for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]); - gpr_free(addr_strs); - - char *target_uri_str = NULL; - /* TODO(dgq): Don't assume all addresses will share the scheme of the first - * one */ - gpr_asprintf(&target_uri_str, "%s:%s", - grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address), - uri_path); - gpr_free(uri_path); - - *targets_info = grpc_slice_hash_table_create( - num_grpclb_addrs, targets_info_entries, destroy_balancer_name); + GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); + grpc_slice_hash_table *targets_info = + grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries, + destroy_balancer_name, balancer_name_cmp_fn); gpr_free(targets_info_entries); - return target_uri_str; + grpc_channel_args *lb_channel_args = + grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info, + response_generator, args); + + grpc_arg lb_channel_addresses_arg = + grpc_lb_addresses_create_channel_arg(lb_addresses); + + grpc_channel_args *result = grpc_channel_args_copy_and_add( + lb_channel_args, &lb_channel_addresses_arg, 1); + grpc_slice_hash_table_unref(exec_ctx, targets_info); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + grpc_lb_addresses_destroy(exec_ctx, lb_addresses); + return result; } +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error); static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { @@ -976,24 +997,31 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ - char *lb_service_target_addresses = - get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info); - grpc_channel_args *lb_channel_args = - get_lb_channel_args(exec_ctx, targets_info, args->args); + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, lb_service_target_addresses, args->client_channel_factory, - lb_channel_args); - grpc_slice_hash_table_unref(exec_ctx, targets_info); + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(lb_service_target_addresses); + gpr_free(uri_str); if (glb_policy->lb_channel == NULL) { gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } + + grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner, false)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); @@ -1009,12 +1037,15 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->client_stats != NULL) { grpc_grpclb_client_stats_unref(glb_policy->client_stats); } - grpc_channel_destroy(glb_policy->lb_channel); - glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } + grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } gpr_free(glb_policy); } @@ -1022,16 +1053,6 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_policy->shutting_down = true; - pending_pick *pp = glb_policy->pending_picks; - glb_policy->pending_picks = NULL; - pending_ping *pping = glb_policy->pending_pings; - glb_policy->pending_pings = NULL; - if (glb_policy->rr_policy) { - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); - } - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); /* We need a copy of the lb_call pointer because we can't cancell the call * while holding glb_policy->mu: lb_on_server_status_received, invoked due to * the cancel, needs to acquire that same lock */ @@ -1045,6 +1066,30 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_call_cancel(lb_call, NULL); /* lb_on_server_status_received will pick up the cancel and clean up */ } + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + + pending_pick *pp = glb_policy->pending_picks; + glb_policy->pending_picks = NULL; + pending_ping *pping = glb_policy->pending_pings; + glb_policy->pending_pings = NULL; + if (glb_policy->rr_policy) { + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } + // We destroy the LB channel here because + // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy + // instance. Destroying the lb channel in glb_destroy would likely result in + // a callback invocation without a valid glb_policy arg. + if (glb_policy->lb_channel != NULL) { + grpc_channel_destroy(glb_policy->lb_channel); + glb_policy->lb_channel = NULL; + } + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown"); + while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -1318,6 +1363,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(glb_policy->lb_call == NULL); GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a @@ -1403,8 +1449,10 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, lb_call_init_locked(exec_ctx, glb_policy); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", - (void *)glb_policy, (void *)glb_policy->lb_call); + gpr_log(GPR_INFO, + "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_channel, + (void *)glb_policy->lb_call); } GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1608,8 +1656,8 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - - if (!glb_policy->shutting_down) { + glb_policy->retry_timer_active = false; + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void *)glb_policy); @@ -1617,31 +1665,32 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_on_retry_timer"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - GPR_ASSERT(glb_policy->lb_call != NULL); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { char *status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); - gpr_log(GPR_DEBUG, + gpr_log(GPR_INFO, "Status from LB server received. Status = %d, Details = '%s', " - "(call: %p)", + "(call: %p), error %p", glb_policy->lb_call_status, status_details, - (void *)glb_policy->lb_call); + (void *)glb_policy->lb_call, (void *)error); gpr_free(status_details); } - /* We need to perform cleanups no matter what. */ lb_call_destroy_locked(exec_ctx, glb_policy); - - if (!glb_policy->shutting_down) { + if (glb_policy->started_picking && glb_policy->updating_lb_call) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + } + if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy); + glb_policy->updating_lb_call = false; + } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = @@ -1651,16 +1700,18 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, (void *)glb_policy); gpr_timespec timeout = gpr_time_sub(next_try, now); if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + gpr_log(GPR_DEBUG, + "... retry_timer_active in %" PRId64 ".%09d seconds.", timeout.tv_sec, timeout.tv_nsec); } else { - gpr_log(GPR_DEBUG, "... retrying immediately."); + gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); grpc_closure_init( &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); } @@ -1668,6 +1719,138 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, "lb_on_server_status_received"); } +static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + glb_lb_policy *glb_policy = (glb_lb_policy *)policy; + + if (glb_policy->updating_lb_channel) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for grpclb %p. Deferring update.", + (void *)glb_policy); + } + if (glb_policy->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, + glb_policy->pending_update_args->args); + gpr_free(glb_policy->pending_update_args); + } + glb_policy->pending_update_args = + gpr_zalloc(sizeof(*glb_policy->pending_update_args)); + glb_policy->pending_update_args->client_channel_factory = + args->client_channel_factory; + glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); + glb_policy->pending_update_args->combiner = args->combiner; + return; + } + + glb_policy->updating_lb_channel = true; + // Propagate update to lb_channel (pick first). + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (glb_policy->lb_channel == NULL) { + // If we don't have a current channel to the LB, go into TRANSIENT + // FAILURE. + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "glb_update_missing"); + } else { + // otherwise, keep using the current LB channel (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for grpclb %p update, " + "ignoring.", + (void *)glb_policy); + } + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + GPR_ASSERT(glb_policy->lb_channel != NULL); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + /* Propagate updates to the LB channel through the fake resolver */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + + if (!glb_policy->watching_lb_channel) { + // Watch the LB channel connectivity for connection. + glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + glb_policy->watching_lb_channel = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + } +} + +// Invoked as part of the update process. It continues watching the LB channel +// until it shuts down or becomes READY. It's invoked even if the LB channel +// stayed READY throughout the update (for example if the update is identical). +static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (glb_policy->shutting_down) goto done; + // Re-initialize the lb_call. This should also take care of updating the + // embedded RR policy. Note that the current RR policy, if any, will stay in + // effect until an update from the new lb_call is received. + switch (glb_policy->lb_channel_connectivity) { + case GRPC_CHANNEL_INIT: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + /* resub. */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(glb_policy->lb_channel)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + exec_ctx, client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + glb_policy->base.interested_parties), + &glb_policy->lb_channel_connectivity, + &glb_policy->lb_channel_on_connectivity_changed, NULL); + break; + } + case GRPC_CHANNEL_IDLE: + // lb channel inactive (probably shutdown prior to update). Restart lb + // call to kick the lb channel into gear. + GPR_ASSERT(glb_policy->lb_call == NULL); + /* fallthrough */ + case GRPC_CHANNEL_READY: + if (glb_policy->lb_call != NULL) { + glb_policy->updating_lb_channel = false; + glb_policy->updating_lb_call = true; + grpc_call_cancel(glb_policy->lb_call, NULL); + // lb_on_server_status_received will pick up the cancel and reinit + // lb_call. + if (glb_policy->pending_update_args != NULL) { + const grpc_lb_policy_args *args = glb_policy->pending_update_args; + glb_policy->pending_update_args = NULL; + glb_update_locked(exec_ctx, &glb_policy->base, args); + } + } else if (glb_policy->started_picking && !glb_policy->shutting_down) { + if (glb_policy->retry_timer_active) { + grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); + glb_policy->retry_timer_active = false; + } + start_picking_locked(exec_ctx, glb_policy); + } + /* fallthrough */ + case GRPC_CHANNEL_SHUTDOWN: + done: + glb_policy->watching_lb_channel = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); + break; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1678,7 +1861,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_ping_one_locked, glb_exit_idle_locked, glb_check_connectivity_locked, - glb_notify_on_state_change_locked}; + glb_notify_on_state_change_locked, + glb_update_locked}; static void glb_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c index d6201f2387..a190c2075d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c @@ -50,28 +50,37 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; - return grpc_channel_args_copy_and_remove(args, keys_to_remove, - GPR_ARRAY_SIZE(keys_to_remove)); + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; + return grpc_channel_args_copy_and_add_and_remove( + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h index 9730c971d9..dfb682ad16 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -35,6 +35,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H #include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/slice/slice_hash_table.h" /** Create the channel used for communicating with an LB service. @@ -49,9 +50,10 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( grpc_client_channel_factory *client_channel_factory, grpc_channel_args *args); -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args); +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c index a145cba63c..21effd75e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c @@ -76,32 +76,39 @@ grpc_channel *grpc_lb_policy_grpclb_create_lb_channel( return lb_channel; } -grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx, - grpc_slice_hash_table *targets_info, - const grpc_channel_args *args) { - const grpc_arg targets_info_arg = - grpc_lb_targets_info_create_channel_arg(targets_info); - /* We strip out the channel arg for the LB policy name, since we want - * to use the default (pick_first) in this case. +grpc_channel_args *grpc_lb_policy_grpclb_build_lb_channel_args( + grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *targets_info, + grpc_fake_resolver_response_generator *response_generator, + const grpc_channel_args *args) { + const grpc_arg to_add[] = { + grpc_lb_targets_info_create_channel_arg(targets_info), + grpc_fake_resolver_response_generator_arg(response_generator)}; + /* We remove: * - * We also strip out the channel arg for the resolved addresses, since - * that will be generated by the name resolver used in the LB channel. - * Note that the LB channel will use the sockaddr resolver, so this - * won't actually generate a query to DNS (or some other name service). - * However, the addresses returned by the sockaddr resolver will have - * is_balancer=false, whereas our own addresses have is_balancer=true. - * We need the LB channel to return addresses with is_balancer=false - * so that it does not wind up recursively using the grpclb LB policy, - * as per the special case logic in client_channel.c. + * - The channel arg for the LB policy name, since we want to use the default + * (pick_first) in this case. * - * Lastly, we also strip out the channel arg for the server URI, - * since that will be different for the LB channel than for the parent - * channel (the client channel factory will re-add this arg with - * the right value). */ + * - The channel arg for the resolved addresses, since that will be generated + * by the name resolver used in the LB channel. Note that the LB channel + * will use the fake resolver, so this won't actually generate a query + * to DNS (or some other name service). However, the addresses returned by + * the fake resolver will have is_balancer=false, whereas our own + * addresses have is_balancer=true. We need the LB channel to return + * addresses with is_balancer=false so that it does not wind up recursively + * using the grpclb LB policy, as per the special case logic in + * client_channel.c. + * + * - The channel arg for the server URI, since that will be different for the + * LB channel than for the parent channel (the client channel factory will + * re-add this arg with the right value). + * + * - The fake resolver generator, because we are replacing it with the one + * from the grpclb policy, used to propagate updates to the LB channel. */ static const char *keys_to_remove[] = { - GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI}; + GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI, + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR}; /* Add the targets info table to be used for secure naming */ return grpc_channel_args_copy_and_add_and_remove( - args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg, - 1); + args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), to_add, + GPR_ARRAY_SIZE(to_add)); } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index b1c5dfc61c..b6f86255df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -37,11 +37,14 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" +grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false); + typedef struct pending_pick { struct pending_pick *next; uint32_t initial_metadata_flags; @@ -54,7 +57,9 @@ typedef struct { grpc_lb_policy base; /** all our subchannels */ grpc_subchannel **subchannels; + grpc_subchannel **new_subchannels; size_t num_subchannels; + size_t num_new_subchannels; grpc_closure connectivity_changed; @@ -63,10 +68,19 @@ typedef struct { /** the selected channel */ grpc_connected_subchannel *selected; + /** the subchannel key for \a selected, or NULL if \a selected not set */ + const grpc_subchannel_key *selected_key; + /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shut down? */ - int shutdown; + bool shutdown; + /** are we updating the selected subchannel? */ + bool updating_selected; + /** are we updating the subchannel candidates? */ + bool updating_subchannels; + /** args from the latest update received while already updating, or NULL */ + grpc_lb_policy_args *pending_update_args; /** which subchannel are we watching? */ size_t checking_subchannel; /** what is the connectivity of that channel? */ @@ -80,23 +94,28 @@ typedef struct { static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); } if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } gpr_free(p->subchannels); + gpr_free(p->new_subchannels); gpr_free(p); } static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - p->shutdown = 1; + p->shutdown = true; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set( @@ -106,7 +125,7 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0) { + } else if (p->num_subchannels > 0 && p->started_picking) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); @@ -169,21 +188,25 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + p->started_picking = true; + if (p->subchannels != NULL) { + GPR_ASSERT(p->num_subchannels > 0); + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } } static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } } @@ -203,7 +226,7 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* No subchannel selected yet, so try again */ if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -216,30 +239,290 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - size_t i; size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels; + grpc_subchannel **subchannels = p->subchannels; - subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - for (i = 0; i < num_subchannels; i++) { + for (size_t i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } - gpr_free(subchannels); } +static grpc_connectivity_state pf_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + return grpc_connectivity_state_get(&p->state_tracker, error); +} + +static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); +} + +static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + } else { + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + } +} + +/* unsubscribe all subchannels */ +static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + if (p->num_subchannels > 0) { + GPR_ASSERT(p->selected == NULL); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); + p->updating_subchannels = true; + } else if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); + p->updating_selected = true; + } +} + +/* true upon success */ +static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + pick_first_lb_policy *p = (pick_first_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + if (p->subchannels == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "pf_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Pick First %p update, " + "ignoring.", + (void *)p); + } + return; + } + const grpc_lb_addresses *addresses = arg->value.pointer.p; + size_t num_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (!addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) { + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "pf_update_empty"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)num_addrs); + } + grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); + /* We remove the following keys in order for subchannel keys belonging to + * subchannels point to the same address to match. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + size_t sc_args_count = 0; + + /* Create list of subchannel args for new addresses in \a args. */ + for (size_t i = 0; i < addresses->num_addresses; i++) { + if (addresses->addresses[i].is_balancer) continue; + if (addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); + gpr_free(addr_arg.value.string); + sc_args[sc_args_count++].args = new_args; + } + + /* Check if p->selected is amongst them. If so, we are done. */ + if (p->selected != NULL) { + GPR_ASSERT(p->selected_key != NULL); + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); + const bool found_selected = + grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; + grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); + if (found_selected) { + // The currently selected subchannel is in the update: we are done. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Pick First %p found already selected subchannel %p amongst " + "updates. Update done.", + (void *)p, (void *)p->selected); + } + for (size_t j = 0; j < sc_args_count; j++) { + grpc_channel_args_destroy(exec_ctx, + (grpc_channel_args *)sc_args[j].args); + } + gpr_free(sc_args); + return; + } + } + } + // We only check for already running updates here because if the previous + // steps were successful, the update can be considered done without any + // interference (ie, no callbacks were scheduled). + if (p->updating_selected || p->updating_subchannels) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Update already in progress for pick first %p. Deferring update.", + (void *)p); + } + if (p->pending_update_args != NULL) { + grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); + gpr_free(p->pending_update_args); + } + p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args)); + p->pending_update_args->client_channel_factory = + args->client_channel_factory; + p->pending_update_args->args = grpc_channel_args_copy(args->args); + p->pending_update_args->combiner = args->combiner; + return; + } + /* Create the subchannels for the new subchannel args/addresses. */ + grpc_subchannel **new_subchannels = + gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + size_t num_new_subchannels = 0; + for (size_t i = 0; i < sc_args_count; i++) { + grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + exec_ctx, args->client_channel_factory, &sc_args[i]); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_INFO, + "Pick First %p created subchannel %p for address uri %s", + (void *)p, (void *)subchannel, address_uri); + gpr_free(address_uri); + } + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); + if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; + } + gpr_free(sc_args); + if (num_new_subchannels == 0) { + gpr_free(new_subchannels); + // Empty update. Unsubscribe from all current subchannels and put the + // channel in TRANSIENT_FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), + "pf_update_no_valid_addresses"); + stop_connectivity_watchers(exec_ctx, p); + return; + } + + /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ + stop_connectivity_watchers(exec_ctx, p); + + /* Save new subchannels. The switch over will happen in + * pf_connectivity_changed_locked */ + if (p->updating_selected || p->updating_subchannels) { + p->num_new_subchannels = num_new_subchannels; + p->new_subchannels = new_subchannels; + } else { /* nothing is updating. Get things moving from here */ + p->num_subchannels = num_new_subchannels; + p->subchannels = new_subchannels; + p->new_subchannels = NULL; + p->num_new_subchannels = 0; + if (p->started_picking) { + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + } +} + static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; + bool restart = false; + if (p->updating_selected && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for p->selected */ + GPR_ASSERT(p->selected != NULL); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, + "pf_update_connectivity"); + p->updating_selected = false; + if (p->num_new_subchannels == 0) { + p->selected = NULL; + return; + } + restart = true; + } + if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) { + /* Captured the unsubscription for the checking subchannel */ + GPR_ASSERT(p->selected == NULL); + for (size_t i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], + "pf_update_connectivity"); + } + gpr_free(p->subchannels); + p->subchannels = NULL; + p->num_subchannels = 0; + p->updating_subchannels = false; + if (p->num_new_subchannels == 0) return; + restart = true; + } + if (restart) { + p->selected = NULL; + p->selected_key = NULL; + + GPR_ASSERT(p->new_subchannels != NULL); + GPR_ASSERT(p->num_new_subchannels > 0); + p->num_subchannels = p->num_new_subchannels; + p->subchannels = p->new_subchannels; + p->num_new_subchannels = 0; + p->new_subchannels = NULL; + + if (p->started_picking) { + /* If we were picking, continue to do so over the new subchannels, + * starting from the 0th index. */ + p->checking_subchannel = 0; + p->checking_connectivity = GRPC_CHANNEL_IDLE; + /* reuses the weak ref from start_picking_locked */ + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); + } + if (p->pending_update_args != NULL) { + const grpc_lb_policy_args *args = p->pending_update_args; + p->pending_update_args = NULL; + pf_update_locked(exec_ctx, &p->base, args); + } + return; + } GRPC_ERROR_REF(error); - if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); @@ -272,6 +555,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected_subchannel), "picked_first"); + + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected); + } + p->selected_key = grpc_subchannel_get_key(selected_subchannel); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); destroy_subchannels_locked(exec_ctx, p); @@ -279,6 +567,11 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "Servicing pending pick with selected subchannel %p", + (void *)p->selected); + } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } @@ -353,32 +646,6 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state pf_check_connectivity_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - return grpc_connectivity_state_get(&p->state_tracker, error); -} - -static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); -} - -static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); - } else { - grpc_closure_sched(exec_ctx, closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); - } -} - static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -388,7 +655,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_ping_one_locked, pf_exit_idle_locked, pf_check_connectivity_locked, - pf_notify_on_state_change_locked}; + pf_notify_on_state_change_locked, + pf_update_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} @@ -398,59 +666,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - if (num_addrs == 0) return NULL; - pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs); - grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - /* Skip balancer addresses, since we only know how to handle backends. */ - if (addresses->addresses[i].is_balancer) continue; - - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); - } - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); - - if (subchannel != NULL) { - p->subchannels[subchannel_idx++] = subchannel; - } - } - if (subchannel_idx == 0) { - gpr_free(p->subchannels); - gpr_free(p); - return NULL; - } - p->num_subchannels = subchannel_idx; - + pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner, false)); @@ -472,6 +689,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() { void grpc_lb_policy_pick_first_init() { grpc_register_lb_policy(pick_first_lb_factory_create()); + grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace); } void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 7ee6ffb787..c9758eef88 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -33,31 +33,11 @@ /** Round Robin Policy. * - * This policy keeps: - * - A circular list of ready (connected) subchannels, the *readylist*. An empty - * readylist consists solely of its root (dummy) node. - * - A pointer to the last element picked from the readylist, the *lastpick*. - * Initially set to point to the readylist's root. - * - * Behavior: - * - When a subchannel connects, it's *prepended* to the readylist's root node. - * Ie, if readylist = A <-> B <-> ROOT <-> C - * ^ ^ - * |____________________| - * and subchannel D becomes connected, the addition of D to the readylist - * results in readylist = A <-> B <-> D <-> ROOT <-> C - * ^ ^ - * |__________________________| - * - When a subchannel disconnects, it's removed from the readylist. If the - * subchannel being removed was the most recently picked, the *lastpick* - * pointer moves to the removed node's previous element. Note that if the - * readylist only had one element, this is still legal, as the lastpick would - * point to the dummy root node, for an empty readylist. - * - Upon picking, *lastpick* is updated to point to the returned (connected) - * subchannel. Note that it's possible that the selected subchannel becomes - * disconnected in the interim between the selection and the actual usage of - * the subchannel by the caller. - */ + * Before every pick, the \a get_next_ready_subchannel_index_locked function + * returns the p->subchannel_list->subchannels index for next subchannel, + * respecting the relative + * order of the addresses provided upon creation or updates. Note however that + * updates will start picking from the beginning of the updated list. */ #include <string.h> @@ -72,8 +52,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct round_robin_lb_policy round_robin_lb_policy; - grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); /** List of entities waiting for a pick. @@ -99,9 +77,37 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; +typedef struct rr_subchannel_list rr_subchannel_list; +typedef struct round_robin_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + rr_subchannel_list *subchannel_list; + + /** have we started picking? */ + bool started_picking; + /** are we shutting down? */ + bool shutdown; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** Index into subchannels for last pick. */ + size_t last_ready_subchannel_index; + + /** Latest version of the subchannel list. + * Subchannel connectivity callbacks will only promote updated subchannel + * lists if they equal \a latest_pending_subchannel_list. In other words, + * racing callbacks that reference outdated subchannel lists won't perform any + * update. */ + rr_subchannel_list *latest_pending_subchannel_list; +} round_robin_lb_policy; + typedef struct { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; + /** backpointer to owning subchannel list */ + rr_subchannel_list *subchannel_list; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ @@ -123,12 +129,9 @@ typedef struct { const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; -struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - /** total number of addresses received at creation time */ - size_t num_addresses; +struct rr_subchannel_list { + /** backpointer to owning policy */ + round_robin_lb_policy *policy; /** all our subchannels */ size_t num_subchannels; @@ -141,67 +144,143 @@ struct round_robin_lb_policy { /** how many subchannels are in state IDLE */ size_t num_idle; - /** have we started picking? */ - bool started_picking; - /** are we shutting down? */ - bool shutdown; - /** List of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; - // Index into subchannels for last pick. - size_t last_ready_subchannel_index; + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; }; -/** Returns the index into p->subchannels of the next subchannel in - * READY state, or p->num_subchannels if no subchannel is READY. +static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list) { + GPR_ASSERT(subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", + (void *)subchannel_list->policy, (void *)subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, + "rr_subchannel_list_destroy"); + } + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count - 1), (unsigned long)count); + } +} + +static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + (void *)subchannel_list->policy, (void *)subchannel_list, + (unsigned long)(count + 1), (unsigned long)count); + } + if (done) { + rr_subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The + * watcher's callback will ultimately unref \a subchannel_list. */ +static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, + rr_subchannel_list *subchannel_list, + const char *reason) { + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } + } + rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +/** Returns the index into p->subchannel_list->subchannels of the next + * subchannel in READY state, or p->subchannel_list->num_subchannels if no + * subchannel is READY. * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ static size_t get_next_ready_subchannel_index_locked( const round_robin_lb_policy *p) { + GPR_ASSERT(p->subchannel_list != NULL); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR: %p] getting next ready subchannel, " + "[RR %p] getting next ready subchannel (out of %lu), " "last_ready_subchannel_index=%lu", - p, (unsigned long)p->last_ready_subchannel_index); + (void *)p, (unsigned long)p->subchannel_list->num_subchannels, + (unsigned long)p->last_ready_subchannel_index); } - for (size_t i = 0; i < p->num_subchannels; ++i) { - const size_t index = - (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + const size_t index = (i + p->last_ready_subchannel_index + 1) % + p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, - (unsigned long)index, - p->subchannels[index].curr_connectivity_state); + gpr_log(GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%d", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + p->subchannel_list->subchannels[index].curr_connectivity_state); } - if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (p->subchannel_list->subchannels[index].curr_connectivity_state == + GRPC_CHANNEL_READY) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", - p, (unsigned long)index); + gpr_log(GPR_DEBUG, + "[RR %p] found next ready subchannel (%p) at index %lu of " + "subchannel_list %p", + (void *)p, + (void *)p->subchannel_list->subchannels[index].subchannel, + (unsigned long)index, (void *)p->subchannel_list); } return index; } } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p); } - return p->num_subchannels; + return p->subchannel_list->num_subchannels; } // Sets p->last_ready_subchannel_index to last_ready_index. static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->num_subchannels); + GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void *)p, (unsigned long)last_ready_index, - (void *)p->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannels[last_ready_index].subchannel)); + gpr_log( + GPR_DEBUG, + "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannel_list->subchannels[last_ready_index].subchannel)); } } @@ -210,18 +289,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - } - } - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); gpr_free(p); } @@ -243,14 +311,9 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -304,15 +367,14 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = &p->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); - } + for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { + subchannel_data *sd = &p->subchannel_list->subchannels[i]; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } @@ -332,63 +394,70 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - /* readily available, report right away */ - subchannel_data *sd = &p->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); - if (user_data != NULL) { - *user_data = sd->user_data; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", - (void *)*target, (unsigned long)next_ready_index); - } - /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; - } else { - /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(exec_ctx, p); + if (p->subchannel_list != NULL) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->subchannel_list->num_subchannels) { + /* readily available, report right away */ + subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; + *target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "rr_picked"); + if (user_data != NULL) { + *user_data = sd->user_data; + } + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " + "INDEX %lu)", + (void *)p, (void *)sd->subchannel, (void *)*target, + (void *)sd->subchannel_list, (unsigned long)next_ready_index); + } + /* only advance the last picked pointer if the selection was used */ + update_last_ready_subchannel_index_locked(p, next_ready_index); + return 1; } - pending_pick *pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->on_complete = on_complete; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->user_data = user_data; - p->pending_picks = pp; - return 0; } + /* no pick currently available. Save for later in list of pending picks */ + if (!p->started_picking) { + start_picking_locked(exec_ctx, p); + } + pending_pick *pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->on_complete = on_complete; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; + p->pending_picks = pp; + return 0; } static void update_state_counters_locked(subchannel_data *sd) { - round_robin_lb_policy *p = sd->policy; + rr_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(p->num_ready > 0); - --p->num_ready; + GPR_ASSERT(subchannel_list->num_ready > 0); + --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(p->num_transient_failures > 0); - --p->num_transient_failures; + GPR_ASSERT(subchannel_list->num_transient_failures > 0); + --subchannel_list->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(p->num_idle > 0); - --p->num_idle; + GPR_ASSERT(subchannel_list->num_idle > 0); + --subchannel_list->num_idle; } if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { - ++p->num_ready; + ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++p->num_transient_failures; + ++subchannel_list->num_transient_failures; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { - ++p->num_idle; + ++subchannel_list->num_idle; } } -/* sd is the subchannel_data associted with the updated subchannel. - * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE - * or SHUTDOWN */ +/** Sets the policy's connectivity status based on that of the passed-in \a sd + * (the subchannel_data associted with the updated subchannel) and the + * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be + * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the + * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we @@ -401,17 +470,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. - * CHECK: p->num_subchannels = 0. + * CHECK: p->subchannel_list->num_subchannels = 0. * * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. - * CHECK: p->num_transient_failures == p->num_subchannels. + * CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels. * * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: p->num_idle == p->num_subchannels. + * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ - round_robin_lb_policy *p = sd->policy; - if (p->num_ready > 0) { /* 1) READY */ + rr_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = subchannel_list->policy; + if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -421,18 +491,19 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); return GRPC_CHANNEL_CONNECTING; - } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + } else if (p->subchannel_list->num_subchannels == 0) { /* 3) SHUTDOWN */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); return GRPC_CHANNEL_SHUTDOWN; - } else if (p->num_transient_failures == - p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + } else if (subchannel_list->num_transient_failures == + p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); return GRPC_CHANNEL_TRANSIENT_FAILURE; - } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + } else if (subchannel_list->num_idle == + p->subchannel_list->num_subchannels) { /* 5) IDLE */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "rr_idle"); return GRPC_CHANNEL_IDLE; @@ -444,7 +515,28 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; - round_robin_lb_policy *p = sd->policy; + round_robin_lb_policy *p = sd->subchannel_list->policy; + // If the policy is shutting down, unref and return. + if (p->shutdown) { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); + return; + } + if (sd->subchannel_list->shutting_down) { + // the subchannel list associated with sd has been discarded. This callback + // corresponds to the unsubscription. + GPR_ASSERT(error == GRPC_ERROR_CANCELLED); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + return; + } + // Dispose of outdated subchannel lists. + if (sd->subchannel_list != p->subchannel_list && + sd->subchannel_list != p->latest_pending_subchannel_list) { + // sd belongs to an outdated subchannel_list: get rid of it. + rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + return; + } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -453,21 +545,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p: " "prev_state=%d new_state=%d", - p, sd->subchannel, sd->prev_connectivity_state, + (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, sd->curr_connectivity_state); } - // If we're shutting down, unref and return. - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - return; - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; - grpc_connectivity_state new_connectivity_state = + const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); - // If the new state is SHUTDOWN, unref the subchannel, and if the new - // overall state is SHUTDOWN, clean up. + // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new + // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); sd->subchannel = NULL; @@ -475,7 +562,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); } - if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* the policy is shutting down. Flush all the pending picks... */ pending_pick *pp; while ((pp = p->pending_picks)) { @@ -486,15 +573,42 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } } /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - } else { + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "rr_connectivity_sd_shutdown"); + } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->subchannel_list != p->subchannel_list) { + // promote sd->subchannel_list to p->subchannel_list. + // sd->subchannel_list must be equal to + // p->latest_pending_subchannel_list because we have already filtered + // for sds belonging to outdated subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(!sd->subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] phasing out subchannel list %p (size %lu) in favor " + "of %p (size %lu)", + (void *)p, (void *)p->subchannel_list, + (unsigned long)p->subchannel_list->num_subchannels, + (void *)sd->subchannel_list, + (unsigned long)sd->subchannel_list->num_subchannels); + } + if (p->subchannel_list != NULL) { + // dispose of the current subchannel_list + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update_connectivity"); + } + p->subchannel_list = sd->subchannel_list; + p->latest_pending_subchannel_list = NULL; + } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->num_subchannels); - subchannel_data *selected = &p->subchannels[next_ready_index]; + GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ @@ -519,7 +633,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref */ + /* renew notification: reuses the "rr_connectivity" weak ref on the policy + * as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -546,8 +661,9 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->num_subchannels) { - subchannel_data *selected = &p->subchannels[next_ready_index]; + if (next_ready_index < p->subchannel_list->num_subchannels) { + subchannel_data *selected = + &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); @@ -559,52 +675,68 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->client_channel_factory != NULL); - - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ +static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_args *args) { + round_robin_lb_policy *p = (round_robin_lb_policy *)policy; + /* Find the number of backend addresses. We ignore balancer addresses, since + * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; + if (p->subchannel_list == NULL) { + // If we don't have a current subchannel list, go into TRANSIENT FAILURE. + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), + "rr_update_missing"); + } else { + // otherwise, keep using the current subchannel list (ignore this update). + gpr_log(GPR_ERROR, + "No valid LB addresses channel arg for Round Robin %p update, " + "ignoring.", + (void *)p); + } + return; } grpc_lb_addresses *addresses = arg->value.pointer.p; size_t num_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } - if (num_addrs == 0) return NULL; - - round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - - p->num_addresses = num_addrs; - p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); - - grpc_subchannel_args sc_args; + if (num_addrs == 0) { + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + "rr_update_empty"); + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_update"); + p->subchannel_list = NULL; + } + return; + } size_t subchannel_index = 0; + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_addrs); + subchannel_list->num_subchannels = num_addrs; + gpr_ref_init(&subchannel_list->refcount, 1); + p->latest_pending_subchannel_list = subchannel_list; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", + (void *)subchannel_list, (unsigned long)num_addrs); + } + grpc_subchannel_args sc_args; + /* We need to remove the LB addresses in order to be able to compare the + * subchannel keys of subchannels from a different batch of addresses. */ + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; - - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + GPR_ASSERT(i < num_addrs); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); @@ -618,52 +750,84 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", - (unsigned long)subchannel_index, (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, + "index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (unsigned long)subchannel_index, (void *)subchannel, address_uri, + (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); - if (subchannel != NULL) { - subchannel_data *sd = &p->subchannels[subchannel_index]; - sd->policy = p; - sd->subchannel = subchannel; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); - } - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); - ++subchannel_index; + subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner, false)); + /* use some sentinel value outside of the range of + * grpc_connectivity_state to signal an undefined previous state. We + * won't be referring to this value again and it'll be overwritten after + * the first call to rr_connectivity_changed_locked */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + if (p->started_picking) { + rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); + /* 2. Watch every new subchannel. A subchannel list becomes active the + * moment one of its subchannels is READY. At that moment, we swap + * p->subchannel_list for sd->subchannel_list, provided the subchannel + * list is still valid (ie, isn't shutting down) */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } } - if (subchannel_index == 0) { - /* couldn't create any subchannel. Bail out */ - gpr_free(p->subchannels); - gpr_free(p); - return NULL; + if (!p->started_picking) { + // The policy isn't picking yet. Save the update for later, disposing of + // previous version if any. + if (p->subchannel_list != NULL) { + rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); + } + p->subchannel_list = subchannel_list; } - p->num_subchannels = subchannel_index; +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked, + rr_update_locked}; + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - // Initialize the last pick index to the last subchannel, so that the - // first pick will start at the beginning of the list. - p->last_ready_subchannel_index = subchannel_index - 1; +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + GPR_ASSERT(args->client_channel_factory != NULL); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); + rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", - (void *)p, (unsigned long)p->num_subchannels); + gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 9d6c0fc139..5c6464bc87 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -118,11 +118,11 @@ grpc_lb_addresses *grpc_lb_addresses_find_channel_arg( const grpc_channel_args *channel_args); /** Arguments passed to LB policies. */ -typedef struct grpc_lb_policy_args { +struct grpc_lb_policy_args { grpc_client_channel_factory *client_channel_factory; grpc_channel_args *args; grpc_combiner *combiner; -} grpc_lb_policy_args; +}; struct grpc_lb_policy_factory_vtable { void (*ref)(grpc_lb_policy_factory *factory); diff --git a/test/core/end2end/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 736b224fd6..cb9b1f07c2 100644 --- a/test/core/end2end/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -54,7 +54,7 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -#include "test/core/end2end/fake_resolver.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" // // fake_resolver @@ -98,7 +98,7 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, fake_resolver* r) { if (r->next_completion != NULL && r->next_results != NULL) { *r->target_result = - grpc_channel_args_merge(r->channel_args, r->next_results); + grpc_channel_args_union(r->next_results, r->channel_args); grpc_channel_args_destroy(exec_ctx, r->next_results); grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; @@ -243,11 +243,13 @@ static char* fake_resolver_get_default_authority(grpc_resolver_factory* factory, static const grpc_resolver_factory_vtable fake_resolver_factory_vtable = { fake_resolver_factory_ref, fake_resolver_factory_unref, - fake_resolver_create, fake_resolver_get_default_authority, "test"}; + fake_resolver_create, fake_resolver_get_default_authority, "fake"}; static grpc_resolver_factory fake_resolver_factory = { &fake_resolver_factory_vtable}; -void grpc_fake_resolver_init(void) { +void grpc_resolver_fake_init(void) { grpc_register_resolver_type(&fake_resolver_factory); } + +void grpc_resolver_fake_shutdown(void) {} diff --git a/test/core/end2end/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index d9668d0d11..373509b97f 100644 --- a/test/core/end2end/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -29,19 +29,17 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // -#ifndef GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H -#define GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" -#include "test/core/util/test_config.h" - #define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \ "grpc.fake_resolver.response_generator" -void grpc_fake_resolver_init(); +void grpc_resolver_fake_init(); // Instances of \a grpc_fake_resolver_response_generator are passed to the // fake resolver in a channel argument (see \a @@ -73,4 +71,5 @@ grpc_fake_resolver_response_generator_ref( void grpc_fake_resolver_response_generator_unref( grpc_fake_resolver_response_generator* generator); -#endif /* GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H */ +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_FAKE_FAKE_RESOLVER_H \ + */ diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index dd14bf1d02..1007916b40 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -307,7 +307,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, const grpc_subchannel_args *args) { - grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); + grpc_subchannel_key *key = grpc_subchannel_key_create(args); grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); if (c) { grpc_subchannel_key_destroy(exec_ctx, key); @@ -770,6 +770,11 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( return GET_CONNECTED_SUBCHANNEL(c, acq); } +const grpc_subchannel_key *grpc_subchannel_get_key( + const grpc_subchannel *subchannel) { + return subchannel->key; +} + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, const grpc_connected_subchannel_call_args *args, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e433c33e40..e284001fa2 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -50,6 +50,7 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; +typedef struct grpc_subchannel_key grpc_subchannel_key; #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ @@ -155,6 +156,10 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_subchannel *subchannel); +/** return the subchannel index key for \a subchannel */ +const grpc_subchannel_key *grpc_subchannel_get_key( + const grpc_subchannel *subchannel); + /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call, diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index b25dbfcf51..2a707353c0 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -50,7 +50,6 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; struct grpc_subchannel_key { - grpc_connector *connector; grpc_subchannel_args args; }; @@ -73,10 +72,9 @@ static grpc_exec_ctx *current_ctx() { } static grpc_subchannel_key *create_key( - grpc_connector *connector, const grpc_subchannel_args *args, + const grpc_subchannel_args *args, grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { grpc_subchannel_key *k = gpr_malloc(sizeof(*k)); - k->connector = grpc_connector_ref(connector); k->args.filter_count = args->filter_count; if (k->args.filter_count > 0) { k->args.filters = @@ -91,19 +89,17 @@ static grpc_subchannel_key *create_key( } grpc_subchannel_key *grpc_subchannel_key_create( - grpc_connector *connector, const grpc_subchannel_args *args) { - return create_key(connector, args, grpc_channel_args_normalize); + const grpc_subchannel_args *args) { + return create_key(args, grpc_channel_args_normalize); } static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { - return create_key(k->connector, &k->args, grpc_channel_args_copy); + return create_key(&k->args, grpc_channel_args_copy); } -static int subchannel_key_compare(grpc_subchannel_key *a, - grpc_subchannel_key *b) { - int c = GPR_ICMP(a->connector, b->connector); - if (c != 0) return c; - c = GPR_ICMP(a->args.filter_count, b->args.filter_count); +int grpc_subchannel_key_compare(const grpc_subchannel_key *a, + const grpc_subchannel_key *b) { + int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { c = memcmp(a->args.filters, b->args.filters, @@ -115,7 +111,6 @@ static int subchannel_key_compare(grpc_subchannel_key *a, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { - grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); gpr_free(k); @@ -128,7 +123,7 @@ static void sck_avl_destroy(void *p) { static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } static long sck_avl_compare(void *a, void *b) { - return subchannel_key_compare(a, b); + return grpc_subchannel_key_compare(a, b); } static void scv_avl_destroy(void *p) { diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index f673ade378..55e90bb645 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -34,17 +34,14 @@ #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H -#include "src/core/ext/filters/client_channel/connector.h" #include "src/core/ext/filters/client_channel/subchannel.h" /** \file Provides an index of active subchannels so that they can be shared amongst channels */ -typedef struct grpc_subchannel_key grpc_subchannel_key; - /** Create a key that can be used to uniquely identify a subchannel */ grpc_subchannel_key *grpc_subchannel_key_create( - grpc_connector *con, const grpc_subchannel_args *args); + const grpc_subchannel_args *args); /** Destroy a subchannel key */ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, @@ -69,6 +66,9 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +int grpc_subchannel_key_compare(const grpc_subchannel_key *a, + const grpc_subchannel_key *b); + /** Initialize the subchannel index (global) */ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h index 7cd70c12d8..0608d1e937 100644 --- a/src/core/ext/filters/workarounds/workaround_utils.h +++ b/src/core/ext/filters/workarounds/workaround_utils.h @@ -49,4 +49,4 @@ typedef bool (*user_agent_parser)(grpc_mdelem); void grpc_register_workaround(uint32_t id, user_agent_parser parser); -#endif +#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H */ diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 247b134938..2df07c8ae6 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -129,9 +129,23 @@ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { return grpc_channel_args_copy_and_add(src, NULL, 0); } -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, +grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a, const grpc_channel_args *b) { - return grpc_channel_args_copy_and_add(a, b->args, b->num_args); + const size_t max_out = (a->num_args + b->num_args); + grpc_arg *uniques = gpr_malloc(sizeof(*uniques) * max_out); + for (size_t i = 0; i < a->num_args; ++i) uniques[i] = a->args[i]; + + size_t uniques_idx = a->num_args; + for (size_t i = 0; i < b->num_args; ++i) { + const char *b_key = b->args[i].key; + if (grpc_channel_args_find(a, b_key) == NULL) { // not found + uniques[uniques_idx++] = b->args[i]; + } + } + grpc_channel_args *result = + grpc_channel_args_copy_and_add(NULL, uniques, uniques_idx); + gpr_free(uniques); + return result; } static int cmp_arg(const grpc_arg *a, const grpc_arg *b) { diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index f0f603e251..128afd00d4 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -63,8 +63,8 @@ grpc_channel_args *grpc_channel_args_copy_and_add_and_remove( const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add); -/** Concatenate args from \a a and \a b into a new instance */ -grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, +/** Perform the union of \a a and \a b, prioritizing \a a entries */ +grpc_channel_args *grpc_channel_args_union(const grpc_channel_args *a, const grpc_channel_args *b); /** Destroy arguments created by \a grpc_channel_args_copy */ diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index e81531053c..740e553b2a 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -38,9 +38,8 @@ #include "src/core/lib/iomgr/pollset_set.h" /* A grpc_polling_entity is a pollset-or-pollset_set container. It allows - * functions that - * accept a pollset XOR a pollset_set to do so through an abstract interface. - * No ownership is taken. */ + * functions that accept a pollset XOR a pollset_set to do so through an + * abstract interface. No ownership is taken. */ typedef struct grpc_polling_entity { union { @@ -64,18 +63,14 @@ grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent); bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent); /** Add the pollset or pollset_set in \a pollent to the destination pollset_set - * \a - * pss_dst */ + * \a * pss_dst */ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_pollset_set *pss_dst); /** Delete the pollset or pollset_set in \a pollent from the destination - * pollset_set \a - * pss_dst */ + * pollset_set \a * pss_dst */ void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_pollset_set *pss_dst); -/* pollset_set specific */ - #endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/security/transport/lb_targets_info.c b/src/core/lib/security/transport/lb_targets_info.c index e73483c039..8bb06b1f61 100644 --- a/src/core/lib/security/transport/lb_targets_info.c +++ b/src/core/lib/security/transport/lb_targets_info.c @@ -44,7 +44,9 @@ static void *targets_info_copy(void *p) { return grpc_slice_hash_table_ref(p); } static void targets_info_destroy(grpc_exec_ctx *exec_ctx, void *p) { grpc_slice_hash_table_unref(exec_ctx, p); } -static int targets_info_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int targets_info_cmp(void *a, void *b) { + return grpc_slice_hash_table_cmp(a, b); +} static const grpc_arg_pointer_vtable server_to_balancer_names_vtable = { targets_info_copy, targets_info_destroy, targets_info_cmp}; diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c index 444f22aa19..2d9ff61c95 100644 --- a/src/core/lib/slice/slice_hash_table.c +++ b/src/core/lib/slice/slice_hash_table.c @@ -43,6 +43,7 @@ struct grpc_slice_hash_table { gpr_refcount refs; void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value); + int (*value_cmp)(void* a, void* b); size_t size; size_t max_num_probes; grpc_slice_hash_table_entry* entries; @@ -72,10 +73,12 @@ static void grpc_slice_hash_table_add(grpc_slice_hash_table* table, grpc_slice_hash_table* grpc_slice_hash_table_create( size_t num_entries, grpc_slice_hash_table_entry* entries, - void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value), + int (*value_cmp)(void* a, void* b)) { grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table)); gpr_ref_init(&table->refs, 1); table->destroy_value = destroy_value; + table->value_cmp = value_cmp; // Keep load factor low to improve performance of lookups. table->size = num_entries * 2; const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size; @@ -121,3 +124,37 @@ void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table, } return NULL; // Not found. } + +static int pointer_cmp(void* a, void* b) { return GPR_ICMP(a, b); } +int grpc_slice_hash_table_cmp(const grpc_slice_hash_table* a, + const grpc_slice_hash_table* b) { + int (*const value_cmp_fn_a)(void* a, void* b) = + a->value_cmp != NULL ? a->value_cmp : pointer_cmp; + int (*const value_cmp_fn_b)(void* a, void* b) = + b->value_cmp != NULL ? b->value_cmp : pointer_cmp; + // Compare value_fns + const int value_fns_cmp = + GPR_ICMP((void*)value_cmp_fn_a, (void*)value_cmp_fn_b); + if (value_fns_cmp != 0) return value_fns_cmp; + // Compare sizes + if (a->size < b->size) return -1; + if (a->size > b->size) return 1; + // Compare rows. + for (size_t i = 0; i < a->size; ++i) { + if (is_empty(&a->entries[i])) { + if (!is_empty(&b->entries[i])) { + return -1; // a empty but b non-empty + } + continue; // both empty, no need to check key or value + } else if (is_empty(&b->entries[i])) { + return 1; // a non-empty but b empty + } + // neither entry is empty + const int key_cmp = grpc_slice_cmp(a->entries[i].key, b->entries[i].key); + if (key_cmp != 0) return key_cmp; + const int value_cmp = + value_cmp_fn_a(a->entries[i].value, b->entries[i].value); + if (value_cmp != 0) return value_cmp; + } + return 0; +} diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h index 1e61c5eb11..07988abff4 100644 --- a/src/core/lib/slice/slice_hash_table.h +++ b/src/core/lib/slice/slice_hash_table.h @@ -54,11 +54,15 @@ typedef struct grpc_slice_hash_table_entry { } grpc_slice_hash_table_entry; /** Creates a new hash table of containing \a entries, which is an array - of length \a num_entries. Takes ownership of all keys and values in - \a entries. Values will be cleaned up via \a destroy_value(). */ + of length \a num_entries. Takes ownership of all keys and values in \a + entries. Values will be cleaned up via \a destroy_value(). If not NULL, \a + value_cmp will be used to compare values in the context of \a + grpc_slice_hash_table_cmp. If NULL, raw pointer (\a GPR_ICMP) comparison + will be used. */ grpc_slice_hash_table *grpc_slice_hash_table_create( size_t num_entries, grpc_slice_hash_table_entry *entries, - void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value)); + void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value), + int (*value_cmp)(void *a, void *b)); grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table); void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, @@ -69,4 +73,13 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, void *grpc_slice_hash_table_get(const grpc_slice_hash_table *table, const grpc_slice key); +/** Compares \a a vs. \a b. + * A table is considered "smaller" (resp. "greater") if: + * - GPR_ICMP(a->value_cmp, b->value_cmp) < 1 (resp. > 1), + * - else, it contains fewer (resp. more) entries, + * - else, if strcmp(a_key, b_key) < 1 (resp. > 1), + * - else, if value_cmp(a_value, b_value) < 1 (resp. > 1). */ +int grpc_slice_hash_table_cmp(const grpc_slice_hash_table *a, + const grpc_slice_hash_table *b); + #endif /* GRPC_CORE_LIB_SLICE_SLICE_HASH_TABLE_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 6aecb7fa93..107818f4d1 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -229,7 +229,7 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_slice_hash_table* method_config_table = NULL; if (entries != NULL) { method_config_table = - grpc_slice_hash_table_create(num_entries, entries, destroy_value); + grpc_slice_hash_table_create(num_entries, entries, destroy_value, NULL); gpr_free(entries); } return method_config_table; diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index 510cf5d5a0..64e3c07510 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -41,6 +41,8 @@ extern void grpc_deadline_filter_init(void); extern void grpc_deadline_filter_shutdown(void); extern void grpc_client_channel_init(void); extern void grpc_client_channel_shutdown(void); +extern void grpc_resolver_fake_init(void); +extern void grpc_resolver_fake_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); @@ -73,6 +75,8 @@ void grpc_register_built_in_plugins(void) { grpc_deadline_filter_shutdown); grpc_register_plugin(grpc_client_channel_init, grpc_client_channel_shutdown); + grpc_register_plugin(grpc_resolver_fake_init, + grpc_resolver_fake_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index e5eb68f934..76b17ed06d 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -47,6 +47,8 @@ extern void grpc_resolver_dns_native_init(void); extern void grpc_resolver_dns_native_shutdown(void); extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); +extern void grpc_resolver_fake_init(void); +extern void grpc_resolver_fake_shutdown(void); extern void grpc_load_reporting_plugin_init(void); extern void grpc_load_reporting_plugin_shutdown(void); extern void grpc_lb_policy_grpclb_init(void); @@ -79,6 +81,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_dns_native_shutdown); grpc_register_plugin(grpc_resolver_sockaddr_init, grpc_resolver_sockaddr_shutdown); + grpc_register_plugin(grpc_resolver_fake_init, + grpc_resolver_fake_shutdown); grpc_register_plugin(grpc_load_reporting_plugin_init, grpc_load_reporting_plugin_shutdown); grpc_register_plugin(grpc_lb_policy_grpclb_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 5f8075467d..8a1a58bb86 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -296,6 +296,7 @@ CORE_SOURCE_FILES = [ 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', + 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c', diff --git a/test/core/client_channel/resolvers/BUILD b/test/core/client_channel/resolvers/BUILD index 80ca7d3ebb..030f6091b5 100644 --- a/test/core/client_channel/resolvers/BUILD +++ b/test/core/client_channel/resolvers/BUILD @@ -74,7 +74,7 @@ grpc_cc_test( deps = [ "//:gpr", "//:grpc", - "//test/core/end2end:fake_resolver", + "//:grpc_resolver_fake", "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", ], diff --git a/test/core/client_channel/resolvers/fake_resolver_test.c b/test/core/client_channel/resolvers/fake_resolver_test.c index 861918fbd6..c211f26b76 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.c +++ b/test/core/client_channel/resolvers/fake_resolver_test.c @@ -39,18 +39,18 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" -#include "test/core/end2end/fake_resolver.h" #include "test/core/util/test_config.h" static grpc_resolver *build_fake_resolver( grpc_exec_ctx *exec_ctx, grpc_combiner *combiner, grpc_fake_resolver_response_generator *response_generator) { - grpc_resolver_factory *factory = grpc_resolver_factory_lookup("test"); + grpc_resolver_factory *factory = grpc_resolver_factory_lookup("fake"); grpc_arg generator_arg = grpc_fake_resolver_response_generator_arg(response_generator); grpc_resolver_args args; @@ -177,7 +177,6 @@ static void test_fake_resolver() { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_fake_resolver_init(); // Registers the "test" scheme. grpc_init(); test_fake_resolver(); diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index cf387a93e8..7705f62a79 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -59,18 +59,6 @@ grpc_cc_library( visibility = ["//test:__subpackages__"], ) -grpc_cc_library( - name = "fake_resolver", - srcs = ["fake_resolver.c"], - hdrs = ["fake_resolver.h"], - language = "C", - visibility = ["//test:__subpackages__"], - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:grpc_test_util", - ], -) grpc_cc_library( name = "http_proxy", diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 6865aefa3d..0adf7eb989 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -173,7 +173,6 @@ def grpc_end2end_tests(): deps = [ ':cq_verifier', ':ssl_test_data', - ':fake_resolver', ':http_proxy', ':proxy', ] diff --git a/test/core/slice/slice_hash_table_test.c b/test/core/slice/slice_hash_table_test.c index 67041b2d5c..16bfb424c3 100644 --- a/test/core/slice/slice_hash_table_test.c +++ b/test/core/slice/slice_hash_table_test.c @@ -77,6 +77,19 @@ static void destroy_string(grpc_exec_ctx* exec_ctx, void* value) { gpr_free(value); } +static grpc_slice_hash_table* create_table_from_entries( + const test_entry* test_entries, size_t num_test_entries, + int (*value_cmp_fn)(void*, void*)) { + // Construct table. + grpc_slice_hash_table_entry* entries = + gpr_zalloc(sizeof(*entries) * num_test_entries); + populate_entries(test_entries, num_test_entries, entries); + grpc_slice_hash_table* table = grpc_slice_hash_table_create( + num_test_entries, entries, destroy_string, value_cmp_fn); + gpr_free(entries); + return table; +} + static void test_slice_hash_table() { const test_entry test_entries[] = { {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}, @@ -115,13 +128,8 @@ static void test_slice_hash_table() { {"key_99", "value_99"}, }; const size_t num_entries = GPR_ARRAY_SIZE(test_entries); - // Construct table. - grpc_slice_hash_table_entry* entries = - gpr_zalloc(sizeof(*entries) * num_entries); - populate_entries(test_entries, num_entries, entries); grpc_slice_hash_table* table = - grpc_slice_hash_table_create(num_entries, entries, destroy_string); - gpr_free(entries); + create_table_from_entries(test_entries, num_entries, NULL); // Check contents of table. check_values(test_entries, num_entries, table); check_non_existent_value("XX", table); @@ -131,8 +139,118 @@ static void test_slice_hash_table() { grpc_exec_ctx_finish(&exec_ctx); } +static int value_cmp_fn(void* a, void* b) { + const char* a_str = a; + const char* b_str = b; + return strcmp(a_str, b_str); +} + +static int pointer_cmp_fn(void* a, void* b) { return GPR_ICMP(a, b); } + +static void test_slice_hash_table_eq() { + const test_entry test_entries_a[] = { + {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a); + grpc_slice_hash_table* table_a = + create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_a) == 0); + + const test_entry test_entries_b[] = { + {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_b = GPR_ARRAY_SIZE(test_entries_b); + grpc_slice_hash_table* table_b = + create_table_from_entries(test_entries_b, num_entries_b, value_cmp_fn); + + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b) == 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_hash_table_unref(&exec_ctx, table_a); + grpc_slice_hash_table_unref(&exec_ctx, table_b); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void test_slice_hash_table_not_eq() { + const test_entry test_entries_a[] = { + {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_a = GPR_ARRAY_SIZE(test_entries_a); + grpc_slice_hash_table* table_a = + create_table_from_entries(test_entries_a, num_entries_a, value_cmp_fn); + + // Different sizes. + const test_entry test_entries_b_smaller[] = {{"key_0", "value_0"}, + {"key_1", "value_1"}}; + const size_t num_entries_b_smaller = GPR_ARRAY_SIZE(test_entries_b_smaller); + grpc_slice_hash_table* table_b_smaller = create_table_from_entries( + test_entries_b_smaller, num_entries_b_smaller, value_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_smaller) > 0); + + const test_entry test_entries_b_larger[] = {{"key_0", "value_0"}, + {"key_1", "value_1"}, + {"key_2", "value_2"}, + {"key_3", "value_3"}}; + const size_t num_entries_b_larger = GPR_ARRAY_SIZE(test_entries_b_larger); + grpc_slice_hash_table* table_b_larger = create_table_from_entries( + test_entries_b_larger, num_entries_b_larger, value_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_b_larger) < 0); + + // One key doesn't match and is lexicographically "smaller". + const test_entry test_entries_c[] = { + {"key_zz", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_c = GPR_ARRAY_SIZE(test_entries_c); + grpc_slice_hash_table* table_c = + create_table_from_entries(test_entries_c, num_entries_c, value_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_c) > 0); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_c, table_a) < 0); + + // One value doesn't match. + const test_entry test_entries_d[] = { + {"key_0", "value_z"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_d = GPR_ARRAY_SIZE(test_entries_d); + grpc_slice_hash_table* table_d = + create_table_from_entries(test_entries_d, num_entries_d, value_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_a, table_d) < 0); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_d, table_a) > 0); + + // Same values but different "equals" functions. + const test_entry test_entries_e[] = { + {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_e = GPR_ARRAY_SIZE(test_entries_e); + grpc_slice_hash_table* table_e = + create_table_from_entries(test_entries_e, num_entries_e, value_cmp_fn); + const test_entry test_entries_f[] = { + {"key_0", "value_0"}, {"key_1", "value_1"}, {"key_2", "value_2"}}; + const size_t num_entries_f = GPR_ARRAY_SIZE(test_entries_f); + grpc_slice_hash_table* table_f = + create_table_from_entries(test_entries_f, num_entries_f, pointer_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_e, table_f) != 0); + + // Same (empty) key, different values. + const test_entry test_entries_g[] = {{"", "value_0"}}; + const size_t num_entries_g = GPR_ARRAY_SIZE(test_entries_g); + grpc_slice_hash_table* table_g = + create_table_from_entries(test_entries_g, num_entries_g, value_cmp_fn); + const test_entry test_entries_h[] = {{"", "value_1"}}; + const size_t num_entries_h = GPR_ARRAY_SIZE(test_entries_h); + grpc_slice_hash_table* table_h = + create_table_from_entries(test_entries_h, num_entries_h, pointer_cmp_fn); + GPR_ASSERT(grpc_slice_hash_table_cmp(table_g, table_h) != 0); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_hash_table_unref(&exec_ctx, table_a); + grpc_slice_hash_table_unref(&exec_ctx, table_b_larger); + grpc_slice_hash_table_unref(&exec_ctx, table_b_smaller); + grpc_slice_hash_table_unref(&exec_ctx, table_c); + grpc_slice_hash_table_unref(&exec_ctx, table_d); + grpc_slice_hash_table_unref(&exec_ctx, table_e); + grpc_slice_hash_table_unref(&exec_ctx, table_f); + grpc_slice_hash_table_unref(&exec_ctx, table_g); + grpc_slice_hash_table_unref(&exec_ctx, table_h); + grpc_exec_ctx_finish(&exec_ctx); +} + int main(int argc, char** argv) { grpc_test_init(argc, argv); test_slice_hash_table(); + test_slice_hash_table_eq(); + test_slice_hash_table_not_eq(); return 0; } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 9b691a83e0..97558d70bb 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -212,8 +212,8 @@ grpc_cc_test( ) grpc_cc_test( - name = "round_robin_end2end_test", - srcs = ["round_robin_end2end_test.cc"], + name = "client_lb_end2end_test", + srcs = ["client_lb_end2end_test.cc"], deps = [ ":test_service_impl", "//:gpr", @@ -243,7 +243,7 @@ grpc_cc_test( "//src/proto/grpc/testing:echo_messages_proto", "//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", - "//test/core/end2end:fake_resolver", + "//:grpc_resolver_fake", "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", "//test/cpp/util:test_util", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc new file mode 100644 index 0000000000..ff00225597 --- /dev/null +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -0,0 +1,485 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <algorithm> +#include <memory> +#include <mutex> +#include <thread> + +#include <gtest/gtest.h> + +#include <grpc++/channel.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +extern "C" { +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +} + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +// Subclass of TestServiceImpl that increments a request counter for +// every call to the Echo RPC. +class MyTestServiceImpl : public TestServiceImpl { + public: + MyTestServiceImpl() : request_count_(0) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) override { + { + std::unique_lock<std::mutex> lock(mu_); + ++request_count_; + } + return TestServiceImpl::Echo(context, request, response); + } + + int request_count() { + std::unique_lock<std::mutex> lock(mu_); + return request_count_; + } + + void ResetCounters() { + std::unique_lock<std::mutex> lock(mu_); + request_count_ = 0; + } + + private: + std::mutex mu_; + int request_count_; +}; + +class ClientLbEnd2endTest : public ::testing::Test { + protected: + ClientLbEnd2endTest() : server_host_("localhost") {} + + void SetUp() override { + response_generator_ = grpc_fake_resolver_response_generator_create(); + } + + void TearDown() override { + grpc_fake_resolver_response_generator_unref(response_generator_); + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + } + + void StartServers(int num_servers) { + for (int i = 0; i < num_servers; ++i) { + servers_.emplace_back(new ServerData(server_host_)); + } + } + + void SetNextResolution(const std::vector<int>& ports) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), NULL); + for (size_t i = 0; i < ports.size(); ++i) { + char* lb_uri_str; + gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]); + grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); + GPR_ASSERT(lb_uri != NULL); + grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri, + false /* is balancer */, + "" /* balancer name */, NULL); + grpc_uri_destroy(lb_uri); + gpr_free(lb_uri_str); + } + const grpc_arg fake_addresses = + grpc_lb_addresses_create_channel_arg(addresses); + grpc_channel_args* fake_result = + grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1); + grpc_fake_resolver_response_generator_set_response( + &exec_ctx, response_generator_, fake_result); + grpc_channel_args_destroy(&exec_ctx, fake_result); + grpc_lb_addresses_destroy(&exec_ctx, addresses); + grpc_exec_ctx_finish(&exec_ctx); + } + + void ResetStub(const grpc::string& lb_policy_name = "") { + ChannelArguments args; + if (lb_policy_name.size() > 0) { + args.SetLoadBalancingPolicyName(lb_policy_name); + } // else, default to pick first + args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, + response_generator_); + std::ostringstream uri; + uri << "fake:///"; + for (size_t i = 0; i < servers_.size() - 1; ++i) { + uri << "127.0.0.1:" << servers_[i]->port_ << ","; + } + uri << "127.0.0.1:" << servers_[servers_.size() - 1]->port_; + channel_ = + CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + } + + void SendRpc() { + EchoRequest request; + EchoResponse response; + request.set_message("Live long and prosper."); + ClientContext context; + Status status = stub_->Echo(&context, request, &response); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.message(), request.message()); + } + + struct ServerData { + int port_; + std::unique_ptr<Server> server_; + MyTestServiceImpl service_; + std::unique_ptr<std::thread> thread_; + + explicit ServerData(const grpc::string& server_host) { + port_ = grpc_pick_unused_port_or_die(); + gpr_log(GPR_INFO, "starting server on port %d", port_); + std::mutex mu; + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerData::Start, this, server_host, &mu, &cond))); + std::unique_lock<std::mutex> lock(mu); + cond.wait(lock); + gpr_log(GPR_INFO, "server startup complete"); + } + + void Start(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + builder.AddListeningPort(server_address.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + std::lock_guard<std::mutex> lock(*mu); + cond->notify_one(); + } + + void Shutdown() { + server_->Shutdown(); + thread_->join(); + } + }; + + void ResetCounters() { + for (const auto& server : servers_) server->service_.ResetCounters(); + } + + void WaitForServer(size_t server_idx) { + do { + SendRpc(); + } while (servers_[server_idx]->service_.request_count() == 0); + ResetCounters(); + } + + const grpc::string server_host_; + std::shared_ptr<Channel> channel_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::vector<std::unique_ptr<ServerData>> servers_; + grpc_fake_resolver_response_generator* response_generator_; +}; + +TEST_F(ClientLbEnd2endTest, PickFirst) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub(); // implicit pick first + std::vector<int> ports; + for (size_t i = 0; i < servers_.size(); ++i) { + ports.emplace_back(servers_[i]->port_); + } + SetNextResolution(ports); + for (size_t i = 0; i < servers_.size(); ++i) { + SendRpc(); + } + // All requests should have gone to a single server. + bool found = false; + for (size_t i = 0; i < servers_.size(); ++i) { + const int request_count = servers_[i]->service_.request_count(); + if (request_count == kNumServers) { + found = true; + } else { + EXPECT_EQ(0, request_count); + } + } + EXPECT_TRUE(found); + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub(); // implicit pick first + std::vector<int> ports; + + // Perform one RPC against the first server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET [0] *******"); + SendRpc(); + EXPECT_EQ(servers_[0]->service_.request_count(), 1); + + // An empty update will result in the channel going into TRANSIENT_FAILURE. + ports.clear(); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET none *******"); + grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + do { + channel_state = channel_->GetState(true /* try to connect */); + } while (channel_state == GRPC_CHANNEL_READY); + GPR_ASSERT(channel_state != GRPC_CHANNEL_READY); + servers_[0]->service_.ResetCounters(); + + // Next update introduces servers_[1], making the channel recover. + ports.clear(); + ports.emplace_back(servers_[1]->port_); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET [1] *******"); + WaitForServer(1); + EXPECT_EQ(servers_[0]->service_.request_count(), 0); + + // And again for servers_[2] + ports.clear(); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET [2] *******"); + WaitForServer(2); + EXPECT_EQ(servers_[0]->service_.request_count(), 0); + EXPECT_EQ(servers_[1]->service_.request_count(), 0); + + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, PickFirstUpdateSuperset) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub(); // implicit pick first + std::vector<int> ports; + + // Perform one RPC against the first server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET [0] *******"); + SendRpc(); + EXPECT_EQ(servers_[0]->service_.request_count(), 1); + servers_[0]->service_.ResetCounters(); + + // Send and superset update + ports.clear(); + ports.emplace_back(servers_[1]->port_); + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + gpr_log(GPR_INFO, "****** SET superset *******"); + SendRpc(); + // We stick to the previously connected server. + WaitForServer(0); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, PickFirstManyUpdates) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub(); // implicit pick first + std::vector<int> ports; + for (size_t i = 0; i < servers_.size(); ++i) { + ports.emplace_back(servers_[i]->port_); + } + for (size_t i = 0; i < 1000; ++i) { + std::random_shuffle(ports.begin(), ports.end()); + SetNextResolution(ports); + if (i % 10 == 0) SendRpc(); + } + // Check LB policy name for the channel. + EXPECT_EQ("pick_first", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, RoundRobin) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector<int> ports; + for (const auto& server : servers_) { + ports.emplace_back(server->port_); + } + SetNextResolution(ports); + for (size_t i = 0; i < servers_.size(); ++i) { + SendRpc(); + } + // One request should have gone to each server. + for (size_t i = 0; i < servers_.size(); ++i) { + EXPECT_EQ(1, servers_[i]->service_.request_count()); + } + // Check LB policy name for the channel. + EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector<int> ports; + + // Start with a single server. + ports.emplace_back(servers_[0]->port_); + SetNextResolution(ports); + WaitForServer(0); + // Send RPCs. They should all go servers_[0] + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(10, servers_[0]->service_.request_count()); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + EXPECT_EQ(0, servers_[2]->service_.request_count()); + servers_[0]->service_.ResetCounters(); + + // And now for the second server. + ports.clear(); + ports.emplace_back(servers_[1]->port_); + SetNextResolution(ports); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. + EXPECT_EQ(0, servers_[1]->service_.request_count()); + WaitForServer(1); + + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(0, servers_[0]->service_.request_count()); + EXPECT_EQ(10, servers_[1]->service_.request_count()); + EXPECT_EQ(0, servers_[2]->service_.request_count()); + servers_[1]->service_.ResetCounters(); + + // ... and for the last server. + ports.clear(); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + WaitForServer(2); + + for (size_t i = 0; i < 10; ++i) SendRpc(); + EXPECT_EQ(0, servers_[0]->service_.request_count()); + EXPECT_EQ(0, servers_[1]->service_.request_count()); + EXPECT_EQ(10, servers_[2]->service_.request_count()); + servers_[2]->service_.ResetCounters(); + + // Back to all servers. + ports.clear(); + ports.emplace_back(servers_[0]->port_); + ports.emplace_back(servers_[1]->port_); + ports.emplace_back(servers_[2]->port_); + SetNextResolution(ports); + WaitForServer(0); + WaitForServer(1); + WaitForServer(2); + + // Send three RPCs, one per server. + for (size_t i = 0; i < 3; ++i) SendRpc(); + EXPECT_EQ(1, servers_[0]->service_.request_count()); + EXPECT_EQ(1, servers_[1]->service_.request_count()); + EXPECT_EQ(1, servers_[2]->service_.request_count()); + + // An empty update will result in the channel going into TRANSIENT_FAILURE. + ports.clear(); + SetNextResolution(ports); + grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + do { + channel_state = channel_->GetState(true /* try to connect */); + } while (channel_state == GRPC_CHANNEL_READY); + GPR_ASSERT(channel_state != GRPC_CHANNEL_READY); + servers_[0]->service_.ResetCounters(); + + // Next update introduces servers_[1], making the channel recover. + ports.clear(); + ports.emplace_back(servers_[1]->port_); + SetNextResolution(ports); + WaitForServer(1); + channel_state = channel_->GetState(false /* try to connect */); + GPR_ASSERT(channel_state == GRPC_CHANNEL_READY); + + // Check LB policy name for the channel. + EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { + // Start servers and send one RPC per server. + const int kNumServers = 3; + StartServers(kNumServers); + ResetStub("round_robin"); + std::vector<int> ports; + for (size_t i = 0; i < servers_.size(); ++i) { + ports.emplace_back(servers_[i]->port_); + } + for (size_t i = 0; i < 1000; ++i) { + std::random_shuffle(ports.begin(), ports.end()); + SetNextResolution(ports); + if (i % 10 == 0) SendRpc(); + } + // Check LB policy name for the channel. + EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + grpc_init(); + const auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b0d4e2dadf..6ff3642adf 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -50,8 +50,8 @@ #include <gtest/gtest.h> extern "C" { +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/iomgr/sockaddr.h" -#include "test/core/end2end/fake_resolver.h" } #include "test/core/util/port.h" @@ -117,6 +117,12 @@ class CountedService : public ServiceType { ++request_count_; } + void ResetCounters() { + std::unique_lock<std::mutex> lock(mu_); + request_count_ = 0; + response_count_ = 0; + } + protected: std::mutex mu_; @@ -181,6 +187,7 @@ class BalancerServiceImpl : public BalancerService { shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { + gpr_log(GPR_INFO, "LB: BalanceLoad"); LoadBalanceRequest request; stream->Read(&request); IncreaseRequestCount(); @@ -200,9 +207,16 @@ class BalancerServiceImpl : public BalancerService { responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { - if (shutdown_) break; + { + std::unique_lock<std::mutex> lock(mu_); + if (shutdown_) break; + } SendResponse(stream, response_and_delay.first, response_and_delay.second); } + { + std::unique_lock<std::mutex> lock(mu_); + serverlist_cond_.wait(lock); + } if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); @@ -226,9 +240,10 @@ class BalancerServiceImpl : public BalancerService { client_stats_.num_calls_finished_known_received += request.client_stats().num_calls_finished_known_received(); std::lock_guard<std::mutex> lock(mu_); - cond_.notify_one(); + load_report_cond_.notify_one(); } + gpr_log(GPR_INFO, "LB: done"); return Status::OK; } @@ -237,9 +252,14 @@ class BalancerServiceImpl : public BalancerService { responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } - void Shutdown() { + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + NotifyDoneWithServerlists(); std::unique_lock<std::mutex> lock(mu_); + const bool prev = !shutdown_; shutdown_ = true; + gpr_log(GPR_INFO, "LB: shut down"); + return prev; } static LoadBalanceResponse BuildResponseForBackends( @@ -264,26 +284,35 @@ class BalancerServiceImpl : public BalancerService { const ClientStats& WaitForLoadReport() { std::unique_lock<std::mutex> lock(mu_); - cond_.wait(lock); + load_report_cond_.wait(lock); return client_stats_; } + void NotifyDoneWithServerlists() { + std::lock_guard<std::mutex> lock(mu_); + serverlist_cond_.notify_one(); + } + private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms); - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); + if (delay_ms > 0) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); + } gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'", response.DebugString().c_str()); - stream->Write(response); IncreaseResponseCount(); + stream->Write(response); } const int client_load_reporting_interval_seconds_; std::vector<ResponseDelayPair> responses_and_delays_; std::mutex mu_; - std::condition_variable cond_; + std::condition_variable load_report_cond_; + std::condition_variable serverlist_cond_; ClientStats client_stats_; bool shutdown_; }; @@ -326,8 +355,7 @@ class GrpclbEnd2endTest : public ::testing::Test { backend_servers_[i].Shutdown(); } for (size_t i = 0; i < balancers_.size(); ++i) { - balancers_[i]->Shutdown(); - balancer_servers_[i].Shutdown(); + if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); } grpc_fake_resolver_response_generator_unref(response_generator_); } @@ -336,8 +364,10 @@ class GrpclbEnd2endTest : public ::testing::Test { ChannelArguments args; args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); - channel_ = CreateCustomChannel("test:///not_used", - InsecureChannelCredentials(), args); + std::ostringstream uri; + uri << "fake:///servername_not_used"; + channel_ = + CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -497,6 +527,7 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -541,7 +572,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { << " message=" << status.error_message(); EXPECT_EQ(response.message(), kMessage_); } - + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent two responses. @@ -608,13 +639,272 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { << " message=" << status.error_message(); EXPECT_EQ(response.message(), kMessage_); } - + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +class UpdatesTest : public GrpclbEnd2endTest { + public: + UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} +}; + +TEST_F(UpdatesTest, UpdateBalancers) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + do { + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (backend_servers_[1].service_->request_count() == 0); + + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +// Send an update with the same set of LBs as the one in SetUp() in order to +// verify that the LB channel inside grpclb keeps the initial connection (which +// by definition is also present in the update). +TEST_F(UpdatesTest, UpdateBalancersRepeated) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[0]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[2].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // grpclb continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + balancers_[0]->NotifyDoneWithServerlists(); + + addresses.clear(); + addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""}); + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 2 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // grpclb continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + balancers_[0]->NotifyDoneWithServerlists(); + + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { + const std::vector<int> first_backend{GetBackendPorts()[0]}; + const std::vector<int> second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), + 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + auto statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill balancer 0 + gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); + if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); + + // This is serviced by the existing RR policy + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should again have gone to the first backend. + EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector<AddressData> addresses; + addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""}); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolution(addresses); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. In the meantime, the client continues to be serviced + // (by the first backend) without interruption. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + do { + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + } while (backend_servers_[1].service_->request_count() == 0); + + // This is serviced by the existing RR policy + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); + statuses_and_responses = SendRpc(kMessage_, 10); + gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + balancers_[0]->NotifyDoneWithServerlists(); + balancers_[1]->NotifyDoneWithServerlists(); + balancers_[2]->NotifyDoneWithServerlists(); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + TEST_F(SingleBalancerTest, Drop) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( @@ -677,6 +967,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -722,6 +1013,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { EXPECT_EQ(kNumRpcsPerAddress, backend_servers_[i].service_->request_count()); } + balancers_[0]->NotifyDoneWithServerlists(); // The balancer got a single request. EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. @@ -748,7 +1040,6 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { int main(int argc, char** argv) { grpc_init(); grpc_test_init(argc, argv); - grpc_fake_resolver_init(); ::testing::InitGoogleTest(&argc, argv); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index a002c7f77d..ee5dfa7133 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -52,6 +52,7 @@ #include <grpc++/impl/codegen/config.h> extern "C" { #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -61,7 +62,6 @@ extern "C" { #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/server.h" #include "test/core/end2end/cq_verifier.h" -#include "test/core/end2end/fake_resolver.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" } @@ -591,7 +591,7 @@ static void setup_client(const server_fixture *lb_server, grpc_uri_destroy(lb_uri); gpr_free(lb_uri_str); - gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport); + gpr_asprintf(&cf->server_uri, "fake:///%s", lb_server->servers_hostport); const grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args *fake_result = @@ -804,7 +804,6 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {} int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_fake_resolver_init(); grpc_test_init(argc, argv); grpc_init(); const auto result = RUN_ALL_TESTS(); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 15410dec01..98ed72a6ad 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -950,6 +950,8 @@ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c \ src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h \ src/core/ext/filters/client_channel/resolver/dns/native/README.md \ src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c \ +src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c \ +src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h \ src/core/ext/filters/client_channel/resolver/sockaddr/README.md \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ src/core/ext/filters/client_channel/resolver_factory.c \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 6c4462e9f2..8835118e3a 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2913,6 +2913,25 @@ { "deps": [ "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "client_lb_end2end_test", + "src": [ + "test/cpp/end2end/client_lb_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", "grpc", "grpc++", "grpc++_codegen_base" @@ -3782,25 +3801,6 @@ "headers": [], "is_filegroup": false, "language": "c++", - "name": "round_robin_end2end_test", - "src": [ - "test/cpp/end2end/round_robin_end2end_test.cc" - ], - "third_party": false, - "type": "target" - }, - { - "deps": [ - "gpr", - "gpr_test_util", - "grpc", - "grpc++", - "grpc++_test_util", - "grpc_test_util" - ], - "headers": [], - "is_filegroup": false, - "language": "c++", "name": "secure_auth_context_test", "src": [ "test/cpp/common/secure_auth_context_test.cc" @@ -5832,6 +5832,7 @@ "grpc_message_size_filter", "grpc_resolver_dns_ares", "grpc_resolver_dns_native", + "grpc_resolver_fake", "grpc_resolver_sockaddr", "grpc_secure", "grpc_server_backward_compatibility", @@ -5939,6 +5940,7 @@ "grpc_message_size_filter", "grpc_resolver_dns_ares", "grpc_resolver_dns_native", + "grpc_resolver_fake", "grpc_resolver_sockaddr", "grpc_server_backward_compatibility", "grpc_transport_chttp2_client_insecure", @@ -8349,6 +8351,7 @@ "gpr", "grpc_base", "grpc_client_channel", + "grpc_resolver_fake", "nanopb" ], "headers": [ @@ -8384,6 +8387,7 @@ "gpr", "grpc_base", "grpc_client_channel", + "grpc_resolver_fake", "grpc_secure", "nanopb" ], @@ -8549,6 +8553,25 @@ "grpc_base", "grpc_client_channel" ], + "headers": [ + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" + ], + "is_filegroup": true, + "language": "c", + "name": "grpc_resolver_fake", + "src": [ + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c", + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", + "grpc_base", + "grpc_client_channel" + ], "headers": [], "is_filegroup": true, "language": "c", @@ -8664,8 +8687,8 @@ "grpc" ], "headers": [ + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h", "test/core/end2end/cq_verifier.h", - "test/core/end2end/fake_resolver.h", "test/core/end2end/fixtures/http_proxy_fixture.h", "test/core/end2end/fixtures/proxy.h", "test/core/iomgr/endpoint_tests.h", @@ -8684,10 +8707,10 @@ "language": "c", "name": "grpc_test_util_base", "src": [ + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c", + "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h", "test/core/end2end/cq_verifier.c", "test/core/end2end/cq_verifier.h", - "test/core/end2end/fake_resolver.c", - "test/core/end2end/fake_resolver.h", "test/core/end2end/fixtures/http_proxy_fixture.c", "test/core/end2end/fixtures/http_proxy_fixture.h", "test/core/end2end/fixtures/proxy.c", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 1242025fc2..337cce829e 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3142,6 +3142,32 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], + "excluded_poll_engines": [ + "poll", + "poll-cv" + ], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "client_lb_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], "flaky": false, "gtest": true, "language": "c++", @@ -3452,6 +3478,10 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], + "excluded_poll_engines": [ + "poll", + "poll-cv" + ], "flaky": false, "gtest": true, "language": "c++", @@ -3474,6 +3504,10 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], + "excluded_poll_engines": [ + "poll", + "poll-cv" + ], "flaky": false, "gtest": false, "language": "c++", @@ -3693,28 +3727,6 @@ "flaky": false, "gtest": true, "language": "c++", - "name": "round_robin_end2end_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ] - }, - { - "args": [], - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", "name": "secure_auth_context_test", "platforms": [ "linux", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 1303366574..d4bdaa233e 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -493,6 +493,7 @@ <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_common.h" /> <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_decode.h" /> <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" /> @@ -961,6 +962,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\pick_first\pick_first.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\round_robin\round_robin.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 9f25a1c179..571837c3fd 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -661,6 +661,9 @@ <ClCompile Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.c"> <Filter>third_party\nanopb</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\pick_first\pick_first.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\pick_first</Filter> </ClCompile> @@ -1424,6 +1427,9 @@ <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb_encode.h"> <Filter>third_party\nanopb</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h"> <Filter>src\core\ext\filters\client_channel\resolver\dns\c_ares</Filter> </ClInclude> @@ -1577,6 +1583,9 @@ <Filter Include="src\core\ext\filters\client_channel\resolver\dns\native"> <UniqueIdentifier>{9b2d7e1f-b78a-2e7a-3000-944e46a5fab9}</UniqueIdentifier> </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver\fake"> + <UniqueIdentifier>{e75d1482-9a43-5fdf-03a5-e2b2833715fb}</UniqueIdentifier> + </Filter> <Filter Include="src\core\ext\filters\client_channel\resolver\sockaddr"> <UniqueIdentifier>{bd317dd5-323e-5b27-4c05-d85786be36ab}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 061fca6d28..a0890196bd 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -181,8 +181,8 @@ <ItemGroup> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\data\ssl_test_data.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\security\oauth2_utils.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h" /> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\proxy.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\iomgr\endpoint_tests.h" /> @@ -321,9 +321,9 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\test\core\security\oauth2_utils.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c"> + <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c"> </ClCompile> diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 35d0ea0cfd..d92688a9a3 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -16,10 +16,10 @@ <ClCompile Include="$(SolutionDir)\..\test\core\security\oauth2_utils.c"> <Filter>test\core\security</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> - <Filter>test\core\end2end</Filter> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c"> + <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> <Filter>test\core\end2end</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c"> @@ -542,10 +542,10 @@ <ClInclude Include="$(SolutionDir)\..\test\core\security\oauth2_utils.h"> <Filter>test\core\security</Filter> </ClInclude> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h"> - <Filter>test\core\end2end</Filter> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> </ClInclude> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h"> + <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h"> <Filter>test\core\end2end</Filter> </ClInclude> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h"> @@ -950,6 +950,21 @@ <Filter Include="src\core"> <UniqueIdentifier>{f7bfac91-5eb2-dea7-4601-6c63edbbf997}</UniqueIdentifier> </Filter> + <Filter Include="src\core\ext"> + <UniqueIdentifier>{5db70e06-741d-708c-bf0a-b59f8ca1f8bd}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters"> + <UniqueIdentifier>{f0f88514-c2d8-c4c9-c3bd-591682207751}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel"> + <UniqueIdentifier>{5bb60a9e-156f-e1c8-3b9c-1b23e7992d7a}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver"> + <UniqueIdentifier>{24a50975-435e-20a5-b0f2-71bc330d0378}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver\fake"> + <UniqueIdentifier>{9e94ffec-fe00-d132-db50-c4a3c218f102}</UniqueIdentifier> + </Filter> <Filter Include="src\core\lib"> <UniqueIdentifier>{f4e8c61e-1ca6-0fdd-7b5e-b7f9a30c9a21}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj index af13acef45..d9977f7c67 100644 --- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj @@ -147,8 +147,8 @@ </ItemDefinitionGroup> <ItemGroup> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h" /> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\proxy.h" /> <ClInclude Include="$(SolutionDir)\..\test\core\iomgr\endpoint_tests.h" /> @@ -164,9 +164,9 @@ <ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h" /> </ItemGroup> <ItemGroup> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c"> + <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c"> </ClCompile> diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters index 4da043ea90..0a0590e9be 100644 --- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters @@ -1,10 +1,10 @@ <?xml version="1.0" encoding="utf-8"?> <Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <ItemGroup> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> - <Filter>test\core\end2end</Filter> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.c"> + <ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c"> <Filter>test\core\end2end</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.c"> @@ -48,10 +48,10 @@ </ClCompile> </ItemGroup> <ItemGroup> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h"> - <Filter>test\core\end2end</Filter> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> </ClInclude> - <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fake_resolver.h"> + <ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h"> <Filter>test\core\end2end</Filter> </ClInclude> <ClInclude Include="$(SolutionDir)\..\test\core\end2end\fixtures\http_proxy_fixture.h"> @@ -96,6 +96,27 @@ </ItemGroup> <ItemGroup> + <Filter Include="src"> + <UniqueIdentifier>{65483377-42fd-137e-3847-00dfd4675db3}</UniqueIdentifier> + </Filter> + <Filter Include="src\core"> + <UniqueIdentifier>{51a516dc-93e3-4dd5-d114-2d06f5df4ad7}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext"> + <UniqueIdentifier>{a927155d-bcf6-0dd8-6d63-be48bcaf617f}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters"> + <UniqueIdentifier>{df16e935-149b-79bf-ecb3-dc3a6b628082}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel"> + <UniqueIdentifier>{0fb7c1f0-5e3a-d1df-4c9d-96a677a7f3ee}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver"> + <UniqueIdentifier>{f47477d5-cb4e-e726-04dd-182151e81c71}</UniqueIdentifier> + </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver\fake"> + <UniqueIdentifier>{2d280bd0-f4ee-d1f2-4d70-174147ac0dbc}</UniqueIdentifier> + </Filter> <Filter Include="test"> <UniqueIdentifier>{037c7645-1698-cf2d-4163-525240323101}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index ac403a7c48..76770b0420 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -450,6 +450,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\deadline\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_ev_driver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" /> @@ -860,6 +861,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\sockaddr\sockaddr_resolver.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 9fee2ec22b..92d9e37b3a 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -562,6 +562,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\sockaddr\sockaddr_resolver.c"> <Filter>src\core\ext\filters\client_channel\resolver\sockaddr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.c"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.c"> <Filter>src\core\ext\filters\load_reporting</Filter> </ClCompile> @@ -1235,6 +1238,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h"> <Filter>src\core\ext\filters\client_channel\resolver\dns\c_ares</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\fake\fake_resolver.h"> + <Filter>src\core\ext\filters\client_channel\resolver\fake</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h"> <Filter>src\core\ext\filters\load_reporting</Filter> </ClInclude> @@ -1412,6 +1418,9 @@ <Filter Include="src\core\ext\filters\client_channel\resolver\dns\native"> <UniqueIdentifier>{55f499bd-ae18-5210-81e1-385c85e60875}</UniqueIdentifier> </Filter> + <Filter Include="src\core\ext\filters\client_channel\resolver\fake"> + <UniqueIdentifier>{7f924133-4a98-87b0-f158-cb64ea91e71a}</UniqueIdentifier> + </Filter> <Filter Include="src\core\ext\filters\client_channel\resolver\sockaddr"> <UniqueIdentifier>{99210f5e-b2a0-ecd1-024f-fc152db68a11}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj index 55e16f188d..ff02417561 100644 --- a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj +++ b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj @@ -20,7 +20,7 @@ </ProjectConfiguration> </ItemGroup> <PropertyGroup Label="Globals"> - <ProjectGuid>{54B15DF6-42BA-5347-C9B8-2D7F1F2921C6}</ProjectGuid> + <ProjectGuid>{903C0B70-3C11-181F-F532-EA3F352BDABB}</ProjectGuid> <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected> <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir> </PropertyGroup> @@ -62,14 +62,14 @@ </ImportGroup> <PropertyGroup Label="UserMacros" /> <PropertyGroup Condition="'$(Configuration)'=='Debug'"> - <TargetName>round_robin_end2end_test</TargetName> + <TargetName>client_lb_end2end_test</TargetName> <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> </PropertyGroup> <PropertyGroup Condition="'$(Configuration)'=='Release'"> - <TargetName>round_robin_end2end_test</TargetName> + <TargetName>client_lb_end2end_test</TargetName> <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib> <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> @@ -160,7 +160,7 @@ </ItemDefinitionGroup> <ItemGroup> - <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\round_robin_end2end_test.cc"> + <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\client_lb_end2end_test.cc"> </ClCompile> </ItemGroup> <ItemGroup> diff --git a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj.filters b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj.filters index 95a149953f..19c421a754 100644 --- a/vsprojects/vcxproj/test/round_robin_end2end_test/round_robin_end2end_test.vcxproj.filters +++ b/vsprojects/vcxproj/test/client_lb_end2end_test/client_lb_end2end_test.vcxproj.filters @@ -1,20 +1,20 @@ <?xml version="1.0" encoding="utf-8"?> <Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <ItemGroup> - <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\round_robin_end2end_test.cc"> + <ClCompile Include="$(SolutionDir)\..\test\cpp\end2end\client_lb_end2end_test.cc"> <Filter>test\cpp\end2end</Filter> </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="test"> - <UniqueIdentifier>{e151f47d-6563-5ef9-fae6-70708f9f8ee6}</UniqueIdentifier> + <UniqueIdentifier>{5e3b0af8-1e9d-7d57-78f8-b695d18d56d2}</UniqueIdentifier> </Filter> <Filter Include="test\cpp"> - <UniqueIdentifier>{07958594-fd93-28f7-9388-c67c952701b8}</UniqueIdentifier> + <UniqueIdentifier>{b82591f8-69bc-ad7c-242a-63ef38a19c51}</UniqueIdentifier> </Filter> <Filter Include="test\cpp\end2end"> - <UniqueIdentifier>{7596a0dd-caa4-b365-a59f-f7ffef38b10d}</UniqueIdentifier> + <UniqueIdentifier>{33186a45-2f2e-0c6b-9b14-c1f096a81ac9}</UniqueIdentifier> </Filter> </ItemGroup> </Project> |