diff options
65 files changed, 1107 insertions, 588 deletions
@@ -113,6 +113,7 @@ GRPC_SECURE_PUBLIC_HDRS = [ GRPCXX_SRCS = [ "src/cpp/client/channel_cc.cc", "src/cpp/client/client_context.cc", + "src/cpp/client/client_interceptor.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/create_channel_internal.cc", "src/cpp/client/create_channel_posix.cc", @@ -2111,7 +2112,6 @@ grpc_cc_library( name = "grpc++_codegen_base_src", srcs = [ "src/cpp/codegen/codegen_init.cc", - "src/cpp/codegen/client_interceptor.cc", ], language = "c++", deps = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 372a92d222..09d625fb02 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2862,6 +2862,7 @@ add_library(grpc++ src/cpp/server/secure_server_credentials.cc src/cpp/client/channel_cc.cc src/cpp/client/client_context.cc + src/cpp/client/client_interceptor.cc src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc @@ -2896,7 +2897,6 @@ add_library(grpc++ third_party/nanopb/pb_common.c third_party/nanopb/pb_decode.c third_party/nanopb/pb_encode.c - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc ) @@ -3234,6 +3234,7 @@ add_library(grpc++_cronet src/cpp/server/insecure_server_credentials.cc src/cpp/client/channel_cc.cc src/cpp/client/client_context.cc + src/cpp/client/client_interceptor.cc src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc @@ -3268,7 +3269,6 @@ add_library(grpc++_cronet third_party/nanopb/pb_common.c third_party/nanopb/pb_decode.c third_party/nanopb/pb_encode.c - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc src/core/ext/transport/chttp2/client/insecure/channel_create.cc src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -4022,7 +4022,6 @@ add_library(grpc++_test_util test/cpp/util/string_ref_helper.cc test/cpp/util/subprocess.cc test/cpp/util/test_credentials_provider.cc - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc ) @@ -4213,7 +4212,6 @@ add_library(grpc++_test_util_unsecure test/cpp/util/byte_buffer_proto_helper.cc test/cpp/util/string_ref_helper.cc test/cpp/util/subprocess.cc - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc ) @@ -4383,6 +4381,7 @@ add_library(grpc++_unsecure src/cpp/server/insecure_server_credentials.cc src/cpp/client/channel_cc.cc src/cpp/client/client_context.cc + src/cpp/client/client_interceptor.cc src/cpp/client/create_channel.cc src/cpp/client/create_channel_internal.cc src/cpp/client/create_channel_posix.cc @@ -4417,7 +4416,6 @@ add_library(grpc++_unsecure third_party/nanopb/pb_common.c third_party/nanopb/pb_decode.c third_party/nanopb/pb_encode.c - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc ) @@ -12447,6 +12445,7 @@ if (gRPC_BUILD_TESTS) add_executable(client_interceptors_end2end_test test/cpp/end2end/client_interceptors_end2end_test.cc + test/cpp/end2end/interceptors_util.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) @@ -12645,7 +12644,6 @@ add_executable(codegen_test_minimal ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/stats.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/stats.grpc.pb.h test/cpp/codegen/codegen_test_minimal.cc - src/cpp/codegen/client_interceptor.cc src/cpp/codegen/codegen_init.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -12901,6 +12899,7 @@ if (gRPC_BUILD_TESTS) add_executable(end2end_test test/cpp/end2end/end2end_test.cc + test/cpp/end2end/interceptors_util.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) @@ -15358,6 +15357,7 @@ endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) add_executable(server_interceptors_end2end_test + test/cpp/end2end/interceptors_util.cc test/cpp/end2end/server_interceptors_end2end_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -5248,6 +5248,7 @@ LIBGRPC++_SRC = \ src/cpp/server/secure_server_credentials.cc \ src/cpp/client/channel_cc.cc \ src/cpp/client/client_context.cc \ + src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_posix.cc \ @@ -5282,7 +5283,6 @@ LIBGRPC++_SRC = \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_encode.c \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ PUBLIC_HEADERS_CXX += \ @@ -5630,6 +5630,7 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/server/insecure_server_credentials.cc \ src/cpp/client/channel_cc.cc \ src/cpp/client/client_context.cc \ + src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_posix.cc \ @@ -5664,7 +5665,6 @@ LIBGRPC++_CRONET_SRC = \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_encode.c \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ src/core/ext/transport/chttp2/client/insecure/channel_create.cc \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc \ @@ -6420,7 +6420,6 @@ LIBGRPC++_TEST_UTIL_SRC = \ test/cpp/util/string_ref_helper.cc \ test/cpp/util/subprocess.cc \ test/cpp/util/test_credentials_provider.cc \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ PUBLIC_HEADERS_CXX += \ @@ -6573,7 +6572,6 @@ $(OBJDIR)/$(CONFIG)/test/cpp/util/create_test_channel.o: $(GENDIR)/src/proto/grp $(OBJDIR)/$(CONFIG)/test/cpp/util/string_ref_helper.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/util/subprocess.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/util/test_credentials_provider.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/src/cpp/codegen/client_interceptor.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/src/cpp/codegen/codegen_init.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc @@ -6586,7 +6584,6 @@ LIBGRPC++_TEST_UTIL_UNSECURE_SRC = \ test/cpp/util/byte_buffer_proto_helper.cc \ test/cpp/util/string_ref_helper.cc \ test/cpp/util/subprocess.cc \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ PUBLIC_HEADERS_CXX += \ @@ -6736,7 +6733,6 @@ $(OBJDIR)/$(CONFIG)/test/cpp/end2end/test_service_impl.o: $(GENDIR)/src/proto/gr $(OBJDIR)/$(CONFIG)/test/cpp/util/byte_buffer_proto_helper.o: $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/util/string_ref_helper.o: $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/util/subprocess.o: $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/src/cpp/codegen/client_interceptor.o: $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(OBJDIR)/$(CONFIG)/src/cpp/codegen/codegen_init.o: $(GENDIR)/src/proto/grpc/health/v1/health.pb.cc $(GENDIR)/src/proto/grpc/health/v1/health.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc @@ -6746,6 +6742,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/server/insecure_server_credentials.cc \ src/cpp/client/channel_cc.cc \ src/cpp/client/client_context.cc \ + src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_posix.cc \ @@ -6780,7 +6777,6 @@ LIBGRPC++_UNSECURE_SRC = \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_decode.c \ third_party/nanopb/pb_encode.c \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ PUBLIC_HEADERS_CXX += \ @@ -17331,6 +17327,7 @@ endif CLIENT_INTERCEPTORS_END2END_TEST_SRC = \ test/cpp/end2end/client_interceptors_end2end_test.cc \ + test/cpp/end2end/interceptors_util.cc \ CLIENT_INTERCEPTORS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_INTERCEPTORS_END2END_TEST_SRC)))) ifeq ($(NO_SECURE),true) @@ -17363,6 +17360,8 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_interceptors_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + deps_client_interceptors_end2end_test: $(CLIENT_INTERCEPTORS_END2END_TEST_OBJS:.o=.dep) ifneq ($(NO_SECURE),true) @@ -17489,7 +17488,6 @@ CODEGEN_TEST_MINIMAL_SRC = \ $(GENDIR)/src/proto/grpc/testing/worker_service.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/stats.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.grpc.pb.cc \ test/cpp/codegen/codegen_test_minimal.cc \ - src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ CODEGEN_TEST_MINIMAL_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CODEGEN_TEST_MINIMAL_SRC)))) @@ -17537,8 +17535,6 @@ $(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/stats.o: $(LIBDIR)/$(CONFIG)/libgrpc $(OBJDIR)/$(CONFIG)/test/cpp/codegen/codegen_test_minimal.o: $(LIBDIR)/$(CONFIG)/libgrpc++_core_stats.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a -$(OBJDIR)/$(CONFIG)/src/cpp/codegen/client_interceptor.o: $(LIBDIR)/$(CONFIG)/libgrpc++_core_stats.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(OBJDIR)/$(CONFIG)/src/cpp/codegen/codegen_init.o: $(LIBDIR)/$(CONFIG)/libgrpc++_core_stats.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a deps_codegen_test_minimal: $(CODEGEN_TEST_MINIMAL_OBJS:.o=.dep) @@ -17549,7 +17545,6 @@ ifneq ($(NO_DEPS),true) endif endif $(OBJDIR)/$(CONFIG)/test/cpp/codegen/codegen_test_minimal.o: $(GENDIR)/src/proto/grpc/testing/control.pb.cc $(GENDIR)/src/proto/grpc/testing/control.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/src/cpp/codegen/client_interceptor.o: $(GENDIR)/src/proto/grpc/testing/control.pb.cc $(GENDIR)/src/proto/grpc/testing/control.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.grpc.pb.cc $(OBJDIR)/$(CONFIG)/src/cpp/codegen/codegen_init.o: $(GENDIR)/src/proto/grpc/testing/control.pb.cc $(GENDIR)/src/proto/grpc/testing/control.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.pb.cc $(GENDIR)/src/proto/grpc/testing/payloads.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.pb.cc $(GENDIR)/src/proto/grpc/testing/benchmark_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.pb.cc $(GENDIR)/src/proto/grpc/testing/report_qps_scenario_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.pb.cc $(GENDIR)/src/proto/grpc/testing/worker_service.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.pb.cc $(GENDIR)/src/proto/grpc/testing/stats.grpc.pb.cc @@ -17770,6 +17765,7 @@ endif END2END_TEST_SRC = \ test/cpp/end2end/end2end_test.cc \ + test/cpp/end2end/interceptors_util.cc \ END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(END2END_TEST_SRC)))) ifeq ($(NO_SECURE),true) @@ -17802,6 +17798,8 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/end2end/end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + deps_end2end_test: $(END2END_TEST_OBJS:.o=.dep) ifneq ($(NO_SECURE),true) @@ -20162,6 +20160,7 @@ endif SERVER_INTERCEPTORS_END2END_TEST_SRC = \ + test/cpp/end2end/interceptors_util.cc \ test/cpp/end2end/server_interceptors_end2end_test.cc \ SERVER_INTERCEPTORS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVER_INTERCEPTORS_END2END_TEST_SRC)))) @@ -20193,6 +20192,8 @@ endif endif +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(OBJDIR)/$(CONFIG)/test/cpp/end2end/server_interceptors_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a deps_server_interceptors_end2end_test: $(SERVER_INTERCEPTORS_END2END_TEST_OBJS:.o=.dep) diff --git a/build.yaml b/build.yaml index 24fe56ab60..3ee13a7dec 100644 --- a/build.yaml +++ b/build.yaml @@ -1262,7 +1262,6 @@ filegroups: - name: grpc++_codegen_base_src language: c++ src: - - src/cpp/codegen/client_interceptor.cc - src/cpp/codegen/codegen_init.cc uses: - grpc++_codegen_base @@ -1383,6 +1382,7 @@ filegroups: src: - src/cpp/client/channel_cc.cc - src/cpp/client/client_context.cc + - src/cpp/client/client_interceptor.cc - src/cpp/client/create_channel.cc - src/cpp/client/create_channel_internal.cc - src/cpp/client/create_channel_posix.cc @@ -4540,6 +4540,7 @@ targets: - test/cpp/end2end/interceptors_util.h src: - test/cpp/end2end/client_interceptors_end2end_test.cc + - test/cpp/end2end/interceptors_util.cc deps: - grpc++_test_util - grpc_test_util @@ -4666,8 +4667,11 @@ targets: cpu_cost: 0.5 build: test language: c++ + headers: + - test/cpp/end2end/interceptors_util.h src: - test/cpp/end2end/end2end_test.cc + - test/cpp/end2end/interceptors_util.cc deps: - grpc++_test_util - grpc_test_util @@ -5467,6 +5471,7 @@ targets: headers: - test/cpp/end2end/interceptors_util.h src: + - test/cpp/end2end/interceptors_util.cc - test/cpp/end2end/server_interceptors_end2end_test.cc deps: - grpc++_test_util diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 2993cdbb69..d4b0548532 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -195,6 +195,7 @@ Pod::Spec.new do |s| 'src/cpp/server/secure_server_credentials.cc', 'src/cpp/client/channel_cc.cc', 'src/cpp/client/client_context.cc', + 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', 'src/cpp/client/create_channel_internal.cc', 'src/cpp/client/create_channel_posix.cc', @@ -225,7 +226,6 @@ Pod::Spec.new do |s| 'src/cpp/util/status.cc', 'src/cpp/util/string_ref.cc', 'src/cpp/util/time_cc.cc', - 'src/cpp/codegen/client_interceptor.cc', 'src/cpp/codegen/codegen_init.cc', 'src/core/lib/gpr/alloc.h', 'src/core/lib/gpr/arena.h', @@ -1386,6 +1386,7 @@ 'src/cpp/server/secure_server_credentials.cc', 'src/cpp/client/channel_cc.cc', 'src/cpp/client/client_context.cc', + 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', 'src/cpp/client/create_channel_internal.cc', 'src/cpp/client/create_channel_posix.cc', @@ -1420,7 +1421,6 @@ 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', - 'src/cpp/codegen/client_interceptor.cc', 'src/cpp/codegen/codegen_init.cc', ], }, @@ -1501,7 +1501,6 @@ 'test/cpp/util/string_ref_helper.cc', 'test/cpp/util/subprocess.cc', 'test/cpp/util/test_credentials_provider.cc', - 'src/cpp/codegen/client_interceptor.cc', 'src/cpp/codegen/codegen_init.cc', ], }, @@ -1522,7 +1521,6 @@ 'test/cpp/util/byte_buffer_proto_helper.cc', 'test/cpp/util/string_ref_helper.cc', 'test/cpp/util/subprocess.cc', - 'src/cpp/codegen/client_interceptor.cc', 'src/cpp/codegen/codegen_init.cc', ], }, @@ -1539,6 +1537,7 @@ 'src/cpp/server/insecure_server_credentials.cc', 'src/cpp/client/channel_cc.cc', 'src/cpp/client/client_context.cc', + 'src/cpp/client/client_interceptor.cc', 'src/cpp/client/create_channel.cc', 'src/cpp/client/create_channel_internal.cc', 'src/cpp/client/create_channel_posix.cc', @@ -1573,7 +1572,6 @@ 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', - 'src/cpp/codegen/client_interceptor.cc', 'src/cpp/codegen/codegen_init.cc', ], }, diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 5c52b027b2..b4c34a01c9 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -214,11 +214,10 @@ class CallNoOp { void AddOp(grpc_op* ops, size_t* nops) {} void FinishOp(bool* status) {} void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { - } + InterceptorBatchMethodsImpl* interceptor_methods) {} + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {} }; class CallOpSendInitialMetadata { @@ -265,7 +264,7 @@ class CallOpSendInitialMetadata { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA); @@ -273,9 +272,9 @@ class CallOpSendInitialMetadata { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -318,7 +317,7 @@ class CallOpSendMessage { void FinishOp(bool* status) { send_buf_.Clear(); } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); @@ -326,9 +325,9 @@ class CallOpSendMessage { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -406,17 +405,17 @@ class CallOpRecvMessage { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvMessage(message_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!got_message) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (message_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( @@ -501,17 +500,17 @@ class CallOpGenericRecvMessage { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvMessage(message_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!got_message) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( @@ -543,16 +542,16 @@ class CallOpClientSendClose { void FinishOp(bool* status) { send_ = false; } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_CLOSE); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -600,7 +599,7 @@ class CallOpServerSendStatus { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_status_available_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_STATUS); @@ -610,9 +609,9 @@ class CallOpServerSendStatus { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -652,19 +651,19 @@ class CallOpRecvInitialMetadata { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvInitialMetadata(metadata_map_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (metadata_map_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); metadata_map_ = nullptr; } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (metadata_map_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( @@ -720,20 +719,20 @@ class CallOpClientRecvStatus { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvStatus(recv_status_); interceptor_methods->SetRecvTrailingMetadata(metadata_map_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (recv_status_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_STATUS); recv_status_ = nullptr; } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (recv_status_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 29deef658f..51367cf550 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -110,6 +110,9 @@ class CallbackWithStatusTag } }; +/// CallbackWithSuccessTag can be reused multiple times, and will be used in +/// this fashion for streaming operations. As a result, it shouldn't clear +/// anything up until its destructor class CallbackWithSuccessTag : public grpc_experimental_completion_queue_functor { public: @@ -125,15 +128,39 @@ class CallbackWithSuccessTag // there are no tests catching the compiler warning. static void operator delete(void*, void*) { assert(0); } - CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {} + CallbackWithSuccessTag() : call_(nullptr) {} CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, - CompletionQueueTag* ops) - : call_(call), func_(std::move(f)), ops_(ops) { + CompletionQueueTag* ops) { + Set(call, f, ops); + } + + CallbackWithSuccessTag(const CallbackWithSuccessTag&) = delete; + CallbackWithSuccessTag& operator=(const CallbackWithSuccessTag&) = delete; + + ~CallbackWithSuccessTag() { Clear(); } + + // Set can only be called on a default-constructed or Clear'ed tag. + // It should never be called on a tag that was constructed with arguments + // or on a tag that has been Set before unless the tag has been cleared. + void Set(grpc_call* call, std::function<void(bool)> f, + CompletionQueueTag* ops) { + call_ = call; + func_ = std::move(f); + ops_ = ops; g_core_codegen_interface->grpc_call_ref(call); functor_run = &CallbackWithSuccessTag::StaticRun; } + void Clear() { + if (call_ != nullptr) { + func_ = nullptr; + grpc_call* call = call_; + call_ = nullptr; + g_core_codegen_interface->grpc_call_unref(call); + } + } + CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag @@ -141,7 +168,7 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } - /// check if this tag has ever been set + /// check if this tag is currently set operator bool() const { return call_ != nullptr; } private: @@ -162,14 +189,8 @@ class CallbackWithSuccessTag GPR_CODEGEN_ASSERT(ignored == ops_); if (do_callback) { - // Last use of func_, so ok to move it out for rvalue call above - auto func = std::move(func_); - func_ = nullptr; // reset to clear this out for sure - CatchingCallback(std::move(func), ok); - } else { - func_ = nullptr; // reset to clear this out for sure + CatchingCallback(func_, ok); } - g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index f53b744dcf..75b955e760 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -426,6 +426,8 @@ class ClientContext { grpc::string authority() { return authority_; } + void SendCancelToInterceptors(); + bool initial_metadata_received_; bool wait_for_ready_; bool wait_for_ready_explicitly_set_; diff --git a/include/grpcpp/impl/codegen/core_codegen_interface.h b/include/grpcpp/impl/codegen/core_codegen_interface.h index 25e3abccca..20a5b3300c 100644 --- a/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -145,6 +145,15 @@ extern CoreCodegenInterface* g_core_codegen_interface; } \ } while (0) +/// Codegen specific version of \a GPR_DEBUG_ASSERT. +#ifndef NDEBUG +#define GPR_CODEGEN_DEBUG_ASSERT(x) GPR_CODEGEN_ASSERT(x) +#else +#define GPR_CODEGEN_DEBUG_ASSERT(x) \ + do { \ + } while (0) +#endif + } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_CORE_CODEGEN_INTERFACE_H diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 15cab711e5..19f6afcb72 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -56,6 +56,11 @@ enum class InterceptionHookPoints { POST_RECV_MESSAGE, POST_RECV_STATUS /* client only */, POST_RECV_CLOSE /* server only */, + /* This is a special hook point available to both clients and servers when + TryCancel() is performed. It is illegal for an interceptor to block/delay + this operation. ALL interceptors see this hook point irrespective of + whether the RPC was hijacked or not. */ + PRE_SEND_CANCEL, NUM_INTERCEPTION_HOOKS }; @@ -66,7 +71,9 @@ class InterceptorBatchMethods { // of type \a type virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0; // Calling this will signal that the interceptor is done intercepting the - // current batch of the RPC + // current batch of the RPC. + // Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning + // from the Intercept method does the job of continuing the RPC in this case. virtual void Proceed() = 0; // Calling this indicates that the interceptor has hijacked the RPC (only // valid if the batch contains send_initial_metadata on the client side) diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index cf564977f6..d0aa23cb0a 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -19,7 +19,13 @@ #ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H #define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H +#include <array> +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set_interface.h> #include <grpcpp/impl/codegen/client_interceptor.h> +#include <grpcpp/impl/codegen/intercepted_channel.h> #include <grpcpp/impl/codegen/server_interceptor.h> #include <grpc/impl/codegen/grpc_types.h> @@ -27,38 +33,9 @@ namespace grpc { namespace internal { -/// Internal methods for setting the state -class InternalInterceptorBatchMethods +class InterceptorBatchMethodsImpl : public experimental::InterceptorBatchMethods { public: - virtual ~InternalInterceptorBatchMethods() {} - - virtual void AddInterceptionHookPoint( - experimental::InterceptionHookPoints type) = 0; - - virtual void SetSendMessage(ByteBuffer* buf) = 0; - - virtual void SetSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) = 0; - - virtual void SetSendStatus(grpc_status_code* code, - grpc::string* error_details, - grpc::string* error_message) = 0; - - virtual void SetSendTrailingMetadata( - std::multimap<grpc::string, grpc::string>* metadata) = 0; - - virtual void SetRecvMessage(void* message) = 0; - - virtual void SetRecvInitialMetadata(MetadataMap* map) = 0; - - virtual void SetRecvStatus(Status* status) = 0; - - virtual void SetRecvTrailingMetadata(MetadataMap* map) = 0; -}; - -class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { - public: InterceptorBatchMethodsImpl() { for (auto i = static_cast<experimental::InterceptionHookPoints>(0); i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS; @@ -75,7 +52,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { return hooks_[static_cast<size_t>(type)]; } - void Proceed() override { /* fill this */ + void Proceed() override { if (call_->client_rpc_info() != nullptr) { return ProceedClient(); } @@ -98,8 +75,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { rpc_info->RunInterceptor(this, current_interceptor_index_); } - void AddInterceptionHookPoint( - experimental::InterceptionHookPoints type) override { + void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) { hooks_[static_cast<size_t>(type)] = true; } @@ -139,34 +115,34 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { return recv_trailing_metadata_->map(); } - void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; } + void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } void SetSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) override { + std::multimap<grpc::string, grpc::string>* metadata) { send_initial_metadata_ = metadata; } void SetSendStatus(grpc_status_code* code, grpc::string* error_details, - grpc::string* error_message) override { + grpc::string* error_message) { code_ = code; error_details_ = error_details; error_message_ = error_message; } void SetSendTrailingMetadata( - std::multimap<grpc::string, grpc::string>* metadata) override { + std::multimap<grpc::string, grpc::string>* metadata) { send_trailing_metadata_ = metadata; } - void SetRecvMessage(void* message) override { recv_message_ = message; } + void SetRecvMessage(void* message) { recv_message_ = message; } - void SetRecvInitialMetadata(MetadataMap* map) override { + void SetRecvInitialMetadata(MetadataMap* map) { recv_initial_metadata_ = map; } - void SetRecvStatus(Status* status) override { recv_status_ = status; } + void SetRecvStatus(Status* status) { recv_status_ = status; } - void SetRecvTrailingMetadata(MetadataMap* map) override { + void SetRecvTrailingMetadata(MetadataMap* map) { recv_trailing_metadata_ = map; } @@ -377,6 +353,105 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { MetadataMap* recv_trailing_metadata_ = nullptr; }; +// A special implementation of InterceptorBatchMethods to send a Cancel +// notification down the interceptor stack +class CancelInterceptorBatchMethods + : public experimental::InterceptorBatchMethods { + public: + bool QueryInterceptionHookPoint( + experimental::InterceptionHookPoints type) override { + if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) { + return true; + } else { + return false; + } + } + + void Proceed() override { + // This is a no-op. For actual continuation of the RPC simply needs to + // return from the Intercept method + } + + void Hijack() override { + // Only the client can hijack when sending down initial metadata + GPR_CODEGEN_ASSERT(false && + "It is illegal to call Hijack on a method which has a " + "Cancel notification"); + } + + ByteBuffer* GetSendMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status GetSendStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendStatus on a method which " + "has a Cancel notification"); + return Status(); + } + + void ModifySendStatus(const Status& status) override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call ModifySendStatus on a method " + "which has a Cancel notification"); + return; + } + + std::multimap<grpc::string, grpc::string>* GetSendTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + void* GetRecvMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status* GetRecvStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvStatus on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + std::unique_ptr<ChannelInterface> GetInterceptedChannel() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetInterceptedChannel on a " + "method which has a Cancel notification"); + return std::unique_ptr<ChannelInterface>(nullptr); + } +}; } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 5d56cbf1df..b866fc16dc 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -22,6 +22,7 @@ #include <functional> #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/callback_common.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> @@ -116,7 +117,7 @@ class CallbackUnaryHandler : public MethodHandler { : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { - finish_tag_ = CallbackWithSuccessTag( + finish_tag_.Set( call_.call(), [this](bool) { grpc_call* call = call_.call(); @@ -149,8 +150,7 @@ class CallbackUnaryHandler : public MethodHandler { void SendInitialMetadata(std::function<void(bool)> f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - meta_tag_ = - CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_tag_.Set(call_.call(), std::move(f), &meta_buf_); meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -57,11 +57,13 @@ os.chdir(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.abspath(PYTHON_STEM)) # Break import-style to ensure we can actually find our in-repo dependencies. +import _parallel_compile_patch import _spawn_patch import commands import grpc_core_dependencies import grpc_version +_parallel_compile_patch.monkeypatch_compile_maybe() _spawn_patch.monkeypatch_spawn() LICENSE = 'Apache License 2.0' @@ -103,6 +105,10 @@ BUILD_WITH_SYSTEM_ZLIB = os.environ.get('GRPC_PYTHON_BUILD_SYSTEM_ZLIB', BUILD_WITH_SYSTEM_CARES = os.environ.get('GRPC_PYTHON_BUILD_SYSTEM_CARES', False) +# If this environmental variable is set, GRPC will not try to be compatible with +# libc versions old than the one it was compiled against. +DISABLE_LIBC_COMPATIBILITY = os.environ.get('GRPC_PYTHON_DISABLE_LIBC_COMPATIBILITY', False) + # Environment variable to determine whether or not to enable coverage analysis # in Cython modules. ENABLE_CYTHON_TRACING = os.environ.get( @@ -198,9 +204,9 @@ if BUILD_WITH_SYSTEM_ZLIB: if BUILD_WITH_SYSTEM_CARES: EXTENSION_LIBRARIES += ('cares',) -DEFINE_MACROS = ( - ('OPENSSL_NO_ASM', 1), ('_WIN32_WINNT', 0x600), - ('GPR_BACKWARDS_COMPATIBILITY_MODE', 1)) +DEFINE_MACROS = (('OPENSSL_NO_ASM', 1), ('_WIN32_WINNT', 0x600)) +if not DISABLE_LIBC_COMPATIBILITY: + DEFINE_MACROS += (('GPR_BACKWARDS_COMPATIBILITY_MODE', 1),) if "win32" in sys.platform: # TODO(zyc): Re-enable c-ares on x64 and x86 windows after fixing the # ares_library_init compilation issue diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 7fb4cbdcd2..38f8072e8d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -319,10 +319,6 @@ class XdsLb : public LoadBalancingPolicy { // The deserialized response from the balancer. May be nullptr until one // such response has arrived. xds_grpclb_serverlist* serverlist_ = nullptr; - // Index into serverlist for next pick. - // If the server at this index is a drop, we return a drop. - // Otherwise, we delegate to the RR policy. - size_t serverlist_index_ = 0; // Timeout in milliseconds for before using fallback backend addresses. // 0 means not using fallback. @@ -837,7 +833,6 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( // serverlist instance will be destroyed either upon the next // update or when the XdsLb instance is destroyed. xdslb_policy->serverlist_ = serverlist; - xdslb_policy->serverlist_index_ = 0; xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked(); } } else { @@ -1575,32 +1570,6 @@ void XdsLb::AddPendingPick(PendingPick* pp) { // completion callback even if the pick is available immediately. bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, grpc_error** error) { - // Check for drops if we are not using fallback backend addresses. - if (serverlist_ != nullptr) { - // Look at the index into the serverlist to see if we should drop this call. - xds_grpclb_server* server = serverlist_->servers[serverlist_index_++]; - if (serverlist_index_ == serverlist_->num_servers) { - serverlist_index_ = 0; // Wrap-around. - } - if (server->drop) { - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { - lb_calld_->client_stats()->AddCallDroppedLocked( - server->load_balance_token); - } - if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); - Delete(pp); - return false; - } - Delete(pp); - return true; - } - } // Set client_stats and user_data. if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { pp->client_stats = lb_calld_->client_stats()->Ref(); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 920d52770f..dbe9df6ae3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -212,10 +212,6 @@ static uint32_t prepare_space_for_new_elem(grpc_chttp2_hpack_compressor* c, return new_index; } -/* dummy function */ -static void add_nothing(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, - size_t elem_size) {} - // Add a key to the dynamic table. Both key and value will be added to table at // the decoder. static void add_key_with_index(grpc_chttp2_hpack_compressor* c, @@ -524,17 +520,22 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, uint32_t indices_key; /* should this elem be in the table? */ - size_t decoder_space_usage = + const size_t decoder_space_usage = grpc_chttp2_get_size_in_hpack_table(elem, st->use_true_binary_metadata); - bool should_add_elem = elem_interned && - decoder_space_usage < MAX_DECODER_SPACE_USAGE && - c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= - c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; - void (*maybe_add)(grpc_chttp2_hpack_compressor*, grpc_mdelem, size_t) = - should_add_elem ? add_elem : add_nothing; - void (*emit)(grpc_chttp2_hpack_compressor*, uint32_t, grpc_mdelem, - framer_state*) = - should_add_elem ? emit_lithdr_incidx : emit_lithdr_noidx; + const bool should_add_elem = elem_interned && + decoder_space_usage < MAX_DECODER_SPACE_USAGE && + c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= + c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; + + auto emit_maybe_add = [&should_add_elem, &elem, &st, &c, &indices_key, + &decoder_space_usage] { + if (should_add_elem) { + emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); + add_elem(c, elem, decoder_space_usage); + } else { + emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); + } + }; /* no hits for the elem... maybe there's a key? */ indices_key = c->indices_keys[HASH_FRAGMENT_2(key_hash)]; @@ -542,8 +543,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - emit(c, dynidx(c, indices_key), elem, st); - maybe_add(c, elem, decoder_space_usage); + emit_maybe_add(); return; } @@ -552,20 +552,23 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem, GRPC_MDKEY(elem)) && indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ - emit(c, dynidx(c, indices_key), elem, st); - maybe_add(c, elem, decoder_space_usage); + emit_maybe_add(); return; } /* no elem, key in the table... fall back to literal emission */ - bool should_add_key = + const bool should_add_key = !elem_interned && decoder_space_usage < MAX_DECODER_SPACE_USAGE; - emit = (should_add_elem || should_add_key) ? emit_lithdr_incidx_v - : emit_lithdr_noidx_v; - maybe_add = - should_add_elem ? add_elem : (should_add_key ? add_key : add_nothing); - emit(c, 0, elem, st); - maybe_add(c, elem, decoder_space_usage); + if (should_add_elem || should_add_key) { + emit_lithdr_incidx_v(c, 0, elem, st); + } else { + emit_lithdr_noidx_v(c, 0, elem, st); + } + if (should_add_elem) { + add_elem(c, elem, decoder_space_usage); + } else if (should_add_key) { + add_key(c, elem, decoder_space_usage); + } } #define STRLEN_LIT(x) (sizeof(x) - 1) diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1c4e2e79fe..61968de4d5 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -608,10 +608,8 @@ void op_state_machine(void* arg, grpc_error* error) { if (other->recv_message_op) { message_transfer_locked(s, other); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } else if (!s->t->is_client && - (s->trailing_md_sent || other->recv_trailing_md_op)) { - // A server send will never be matched if the client is waiting - // for trailing metadata already + } else if (!s->t->is_client && s->trailing_md_sent) { + // A server send will never be matched if the server already sent status s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, @@ -622,11 +620,15 @@ void op_state_machine(void* arg, grpc_error* error) { // Pause a send trailing metadata if there is still an outstanding // send message unless we know that the send message will never get // matched to a receive. This happens on the client if the server has - // already sent status. + // already sent status or on the server if the client has requested + // status if (s->send_trailing_md_op && (!s->send_message_op || (s->t->is_client && - (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + (s->trailing_md_recvd || s->to_read_trailing_md_filled)) || + (!s->t->is_client && other && + (other->trailing_md_recvd || other->to_read_trailing_md_filled || + other->recv_trailing_md_op)))) { grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; @@ -724,16 +726,6 @@ void op_state_machine(void* arg, grpc_error* error) { maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } } - if (s->recv_trailing_md_op && s->t->is_client && other && - other->send_message_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready %p", s, - GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_NONE); - maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); - } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { new_err = @@ -749,6 +741,7 @@ void op_state_machine(void* arg, grpc_error* error) { if (s->recv_message_op != nullptr) { // This message needs to be wrapped up because it will never be // satisfied + *s->recv_message_op->payload->recv_message.recv_message = nullptr; INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, @@ -811,6 +804,7 @@ void op_state_machine(void* arg, grpc_error* error) { // No further message will come on this stream, so finish off the // recv_message_op INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); + *s->recv_message_op->payload->recv_message.recv_message = nullptr; GRPC_CLOSURE_SCHED( s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1013,18 +1007,18 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } // We want to initiate the closure if: - // 1. We want to send a message and the other side wants to receive or end + // 1. We want to send a message and the other side wants to receive // 2. We want to send trailing metadata and there isn't an unmatched send + // or the other side wants trailing metadata // 3. We want initial metadata and the other side has sent it // 4. We want to receive a message and there is a message ready // 5. There is trailing metadata, even if nothing specifically wants // that because that can shut down the receive message as well - if ((op->send_message && other && - ((other->recv_message_op != nullptr) || - (other->recv_trailing_md_op != nullptr))) || - (op->send_trailing_metadata && !op->send_message) || + if ((op->send_message && other && other->recv_message_op != nullptr) || + (op->send_trailing_metadata && + (!s->send_message_op || (other && other->recv_trailing_md_op))) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || - (op->recv_message && other && (other->send_message_op != nullptr)) || + (op->recv_message && other && other->send_message_op != nullptr) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE); diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index 056fcd93de..df956c7176 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -157,7 +157,6 @@ grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, size_t count = channel_stack->count; grpc_call_element* call_elems; char* user_data; - size_t i; elem_args->call_stack->count = count; GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy, @@ -168,10 +167,14 @@ grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, /* init per-filter data */ grpc_error* first_error = GRPC_ERROR_NONE; - for (i = 0; i < count; i++) { + for (size_t i = 0; i < count; i++) { call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; + user_data += + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } + for (size_t i = 0; i < count; i++) { grpc_error* error = call_elems[i].filter->init_call_elem(&call_elems[i], elem_args); if (error != GRPC_ERROR_NONE) { @@ -181,8 +184,6 @@ grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, GRPC_ERROR_UNREF(error); } } - user_data += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } return first_error; } diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index c4b3a9336d..4c337a0521 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -296,10 +296,12 @@ grpc_error* grpc_set_socket_tcp_user_timeout( socklen_t len = sizeof(newval); if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout, sizeof(timeout))) { - return GRPC_OS_ERROR(errno, "setsockopt(TCP_USER_TIMEOUT)"); + gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return GRPC_ERROR_NONE; } if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) { - return GRPC_OS_ERROR(errno, "getsockopt(TCP_USER_TIMEOUT)"); + gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return GRPC_ERROR_NONE; } if (newval != timeout) { /* Do not fail on failing to set TCP_USER_TIMEOUT for now. */ diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index b81ae73b4d..661022ec5f 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -859,8 +859,8 @@ static void cq_end_op_for_callback( gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_callback(cq); gpr_mu_unlock(cq->mu); + cq_finish_shutdown_callback(cq); } else { gpr_mu_unlock(cq->mu); } diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 6ef7df4ea0..8e1cea0269 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -147,10 +147,14 @@ internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method, } } grpc_census_call_set_context(c_call, context->census_context()); - context->set_call(c_call, shared_from_this()); + // ClientRpcInfo should be set before call because set_call also checks + // whether the call has been cancelled, and if the call was cancelled, we + // should notify the interceptors too/ auto* info = context->set_client_rpc_info( method.name(), this, interceptor_creators_, interceptor_pos); + context->set_call(c_call, shared_from_this()); + return internal::Call(c_call, this, cq, info); } diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 07a04e4268..50da75f09c 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -24,6 +24,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpcpp/impl/codegen/interceptor_common.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/security/credentials.h> #include <grpcpp/server_context.h> @@ -86,10 +87,13 @@ void ClientContext::set_call(grpc_call* call, call_ = call; channel_ = channel; if (creds_ && !creds_->ApplyToCall(call_)) { + // TODO(yashykt): should interceptors also see this status? + SendCancelToInterceptors(); grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Failed to set credentials to rpc.", nullptr); } if (call_canceled_) { + SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); } } @@ -110,12 +114,20 @@ void ClientContext::set_compression_algorithm( void ClientContext::TryCancel() { std::unique_lock<std::mutex> lock(mu_); if (call_) { + SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); } else { call_canceled_ = true; } } +void ClientContext::SendCancelToInterceptors() { + internal::CancelInterceptorBatchMethods cancel_methods; + for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) { + rpc_info_.RunInterceptor(&cancel_methods, i); + } +} + grpc::string ClientContext::peer() const { grpc::string peer; if (call_) { diff --git a/src/cpp/codegen/client_interceptor.cc b/src/cpp/client/client_interceptor.cc index 3a5cac9830..3a5cac9830 100644 --- a/src/cpp/codegen/client_interceptor.cc +++ b/src/cpp/client/client_interceptor.cc diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index d1cd78e755..7faaa20e78 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -228,9 +228,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata( } if (w->plugin_->IsBlocking()) { // Asynchronous return. - w->thread_pool_->Add( - std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, - cb, user_data, nullptr, nullptr, nullptr, nullptr)); + w->thread_pool_->Add([w, context, cb, user_data] { + w->MetadataCredentialsPluginWrapper::InvokePlugin( + context, cb, user_data, nullptr, nullptr, nullptr, nullptr); + }); return 0; } else { // Synchronous return. diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index 536bf022dd..ebb17def32 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -43,9 +43,10 @@ void AuthMetadataProcessorAyncWrapper::Process( return; } if (w->processor_->IsBlocking()) { - w->thread_pool_->Add( - std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w, - context, md, num_md, cb, user_data)); + w->thread_pool_->Add([w, context, md, num_md, cb, user_data] { + w->AuthMetadataProcessorAyncWrapper::InvokeProcessor(context, md, num_md, + cb, user_data); + }); } else { // invoke directly. w->InvokeProcessor(context, md, num_md, cb, user_data); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 870ee84e3e..c031528a8f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -268,8 +268,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { interceptor_methods_.SetRecvMessage(request_); } - auto f = std::bind(&CallData::ContinueRunAfterInterception, this); - if (interceptor_methods_.RunInterceptors(f)) { + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { ContinueRunAfterInterception(); } else { // There were interceptors to be run, so ContinueRunAfterInterception @@ -981,10 +981,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); - auto f = std::bind(&ServerInterface::BaseAsyncRequest:: - ContinueFinalizeResultAfterInterception, - this); - if (interceptor_methods_.RunInterceptors(f)) { + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueFinalizeResultAfterInterception(); })) { // There are no interceptors to run. Continue } else { // There were interceptors to be run, so diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 355debb3fb..396996e5bc 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -252,6 +252,7 @@ void ServerContext::Clear() { } if (completion_op_) { completion_op_->Unref(); + completion_tag_.Clear(); } if (rpc_info_) { rpc_info_->Unref(); @@ -270,8 +271,7 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); if (callback) { - completion_tag_ = - internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_tag_.Set(call->call(), nullptr, completion_op_); completion_op_->set_core_cq_tag(&completion_tag_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); @@ -294,6 +294,12 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key, } void ServerContext::TryCancel() const { + internal::CancelInterceptorBatchMethods cancel_methods; + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status( call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); if (err != GRPC_CALL_OK) { diff --git a/src/python/grpcio/_parallel_compile_patch.py b/src/python/grpcio/_parallel_compile_patch.py new file mode 100644 index 0000000000..4d03ef49ba --- /dev/null +++ b/src/python/grpcio/_parallel_compile_patch.py @@ -0,0 +1,63 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Patches the compile() to allow enable parallel compilation of C/C++. + +build_ext has lots of C/C++ files and normally them one by one. +Enabling parallel build helps a lot. +""" + +import distutils.ccompiler +import os + +try: + BUILD_EXT_COMPILER_JOBS = int( + os.environ.get('GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS', '1')) +except ValueError: + BUILD_EXT_COMPILER_JOBS = 1 + + +# monkey-patch for parallel compilation +def _parallel_compile(self, + sources, + output_dir=None, + macros=None, + include_dirs=None, + debug=0, + extra_preargs=None, + extra_postargs=None, + depends=None): + # setup the same way as distutils.ccompiler.CCompiler + # https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564 + macros, objects, extra_postargs, pp_opts, build = self._setup_compile( + output_dir, macros, include_dirs, sources, depends, extra_postargs) + cc_args = self._get_cc_args(pp_opts, debug, extra_preargs) + + def _compile_single_file(obj): + try: + src, ext = build[obj] + except KeyError: + return + self._compile(obj, src, ext, cc_args, extra_postargs, pp_opts) + + # run compilation of individual files in parallel + import multiprocessing.pool + multiprocessing.pool.ThreadPool(BUILD_EXT_COMPILER_JOBS).map( + _compile_single_file, objects) + return objects + + +def monkeypatch_compile_maybe(): + """Monkeypatching is dumb, but the build speed gain is worth it.""" + if BUILD_EXT_COMPILER_JOBS > 1: + distutils.ccompiler.CCompiler.compile = _parallel_compile diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index e3d776b79a..b805f4277b 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -212,6 +212,12 @@ class BuildExt(build_ext.build_ext): LINK_OPTIONS = {} def build_extensions(self): + # This special conditioning is here due to difference of compiler + # behavior in gcc and clang. The clang doesn't take --stdc++11 + # flags but gcc does. Since the setuptools of Python only support + # all C or all C++ compilation, the mix of C and C++ will crash. + # *By default*, the macOS use clang and Linux use gcc, that's why + # the special condition here is checking platform. if "darwin" in sys.platform: config = os.environ.get('CONFIG', 'opt') target_path = os.path.abspath( diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index d35f4566bd..42f3a4e614 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -66,18 +66,13 @@ def encode(s): if isinstance(s, bytes): return s else: - return s.encode('ascii') + return s.encode('utf8') def decode(b): - if isinstance(b, str): - return b - else: - try: - return b.decode('utf8') - except UnicodeDecodeError: - _LOGGER.exception('Invalid encoding on %s', b) - return b.decode('latin1') + if isinstance(b, bytes): + return b.decode('utf-8', 'replace') + return b def _transform(message, transformer, exception_message): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/security.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/security.pyx.pxi index 7decae95bb..e17ca6d335 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/security.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/security.pyx.pxi @@ -14,14 +14,16 @@ from libc.string cimport memcpy -import pkg_resources +import pkgutil cdef grpc_ssl_roots_override_result ssl_roots_override_callback( char **pem_root_certs) nogil: with gil: - temporary_pem_root_certs = pkg_resources.resource_string( - __name__.rstrip('.cygrpc'), '_credentials/roots.pem') + pkg = __name__ + if pkg.endswith('.cygrpc'): + pkg = pkg[:-len('.cygrpc')] + temporary_pem_root_certs = pkgutil.get_data(pkg, '_credentials/roots.pem') pem_root_certs[0] = <char *>gpr_malloc(len(temporary_pem_root_certs) + 1) memcpy( pem_root_certs[0], <char *>temporary_pem_root_certs, diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 026f7ba2e3..ae5c07bfc8 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -15,7 +15,6 @@ cimport cpython -import pkg_resources import os.path import sys diff --git a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py index b4079850ff..7da6e7b34c 100644 --- a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py +++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py @@ -13,9 +13,9 @@ # limitations under the License. import json +import pkgutil import unittest -import pkg_resources import six import tests @@ -35,7 +35,7 @@ class SanityTest(unittest.TestCase): loader.suite) }) - tests_json_string = pkg_resources.resource_string('tests', 'tests.json') + tests_json_string = pkgutil.get_data('tests', 'tests.json') tests_json = json.loads(tests_json_string.decode() if six.PY3 else tests_json_string) diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index cda15a68a3..721dedf0b7 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -457,6 +457,22 @@ def _per_rpc_creds(stub, args): response.username)) +def _special_status_message(stub, args): + details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode( + 'utf-8') + code = 2 + status = grpc.StatusCode.UNKNOWN # code = 2 + + # Test with a UnaryCall + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=1, + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus(code=code, message=details)) + response_future = stub.UnaryCall.future(request) + _validate_status_code_and_details(response_future, status, details) + + @enum.unique class TestCase(enum.Enum): EMPTY_UNARY = 'empty_unary' @@ -476,6 +492,7 @@ class TestCase(enum.Enum): JWT_TOKEN_CREDS = 'jwt_token_creds' PER_RPC_CREDS = 'per_rpc_creds' TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' + SPECIAL_STATUS_MESSAGE = 'special_status_message' def test_interoperability(self, stub, args): if self is TestCase.EMPTY_UNARY: @@ -512,6 +529,8 @@ class TestCase(enum.Enum): _jwt_token_creds(stub, args) elif self is TestCase.PER_RPC_CREDS: _per_rpc_creds(stub, args) + elif self is TestCase.SPECIAL_STATUS_MESSAGE: + _special_status_message(stub, args) else: raise NotImplementedError( 'Test case "%s" not implemented!' % self.name) diff --git a/src/python/grpcio_tests/tests/interop/resources.py b/src/python/grpcio_tests/tests/interop/resources.py index 2f76cf5db6..a55919a60a 100644 --- a/src/python/grpcio_tests/tests/interop/resources.py +++ b/src/python/grpcio_tests/tests/interop/resources.py @@ -14,27 +14,24 @@ """Constants and functions for data used in interoperability testing.""" import argparse +import pkgutil import os -import pkg_resources - _ROOT_CERTIFICATES_RESOURCE_PATH = 'credentials/ca.pem' _PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' _CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' def test_root_certificates(): - return pkg_resources.resource_string(__name__, - _ROOT_CERTIFICATES_RESOURCE_PATH) + return pkgutil.get_data(__name__, _ROOT_CERTIFICATES_RESOURCE_PATH) def private_key(): - return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH) + return pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) def certificate_chain(): - return pkg_resources.resource_string(__name__, - _CERTIFICATE_CHAIN_RESOURCE_PATH) + return pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) def parse_bool(value): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 5505369867..c5ea8c5fbb 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -42,6 +42,7 @@ "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", "unit._empty_message_test.EmptyMessageTest", + "unit._error_message_encoding_test.ErrorMessageEncodingTest", "unit._exit_test.ExitTest", "unit._interceptor_test.InterceptorTest", "unit._invalid_metadata_test.InvalidMetadataTest", diff --git a/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py new file mode 100644 index 0000000000..6c551df3ec --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py @@ -0,0 +1,86 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests 'utf-8' encoded error message.""" + +import unittest +import weakref + +import grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_UNICODE_ERROR_MESSAGES = [ + b'\xe2\x80\x9d'.decode('utf-8'), + b'abc\x80\xd0\xaf'.decode('latin-1'), + b'\xc3\xa9'.decode('utf-8'), +] + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x00' + +_UNARY_UNARY = '/test/UnaryUnary' + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming=None, response_streaming=None): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + def unary_unary(self, request, servicer_context): + servicer_context.set_code(grpc.StatusCode.UNKNOWN) + servicer_context.set_details(request.decode('utf-8')) + return _RESPONSE + + +class _GenericHandler(grpc.GenericRpcHandler): + + def __init__(self, test): + self._test = test + + def service(self, handler_call_details): + return _MethodHandler() + + +class ErrorMessageEncodingTest(unittest.TestCase): + + def setUp(self): + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers((_GenericHandler( + weakref.proxy(self)),)) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._server.stop(0) + + def testMessageEncoding(self): + for message in _UNICODE_ERROR_MESSAGES: + multi_callable = self._channel.unary_unary(_UNARY_UNARY) + with self.assertRaises(grpc.RpcError) as cm: + multi_callable(message.encode('utf-8')) + + self.assertEqual(cm.exception.code(), grpc.StatusCode.UNKNOWN) + self.assertEqual(cm.exception.details(), message) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/resources.py b/src/python/grpcio_tests/tests/unit/resources.py index 51a8979f58..6efd870fc8 100644 --- a/src/python/grpcio_tests/tests/unit/resources.py +++ b/src/python/grpcio_tests/tests/unit/resources.py @@ -14,8 +14,7 @@ """Constants and functions for data used in testing.""" import os - -import pkg_resources +import pkgutil _ROOT_CERTIFICATES_RESOURCE_PATH = 'credentials/ca.pem' _PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' @@ -23,94 +22,92 @@ _CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' def test_root_certificates(): - return pkg_resources.resource_string(__name__, - _ROOT_CERTIFICATES_RESOURCE_PATH) + return pkgutil.get_data(__name__, _ROOT_CERTIFICATES_RESOURCE_PATH) def private_key(): - return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH) + return pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) def certificate_chain(): - return pkg_resources.resource_string(__name__, - _CERTIFICATE_CHAIN_RESOURCE_PATH) + return pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) def cert_hier_1_root_ca_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/certs/ca.cert.pem') def cert_hier_1_intermediate_ca_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/intermediate/certs/intermediate.cert.pem' ) def cert_hier_1_client_1_key(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/intermediate/private/client.key.pem' ) def cert_hier_1_client_1_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/intermediate/certs/client.cert.pem' ) def cert_hier_1_server_1_key(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/intermediate/private/localhost-1.key.pem' ) def cert_hier_1_server_1_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_1/intermediate/certs/localhost-1.cert.pem' ) def cert_hier_2_root_ca_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/certs/ca.cert.pem') def cert_hier_2_intermediate_ca_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/intermediate/certs/intermediate.cert.pem' ) def cert_hier_2_client_1_key(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/intermediate/private/client.key.pem' ) def cert_hier_2_client_1_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/intermediate/certs/client.cert.pem' ) def cert_hier_2_server_1_key(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/intermediate/private/localhost-1.key.pem' ) def cert_hier_2_server_1_cert(): - return pkg_resources.resource_string( + return pkgutil.get_data( __name__, 'credentials/certificate_hierarchy_2/intermediate/certs/localhost-1.cert.pem' ) diff --git a/test/core/debug/BUILD b/test/core/debug/BUILD new file mode 100644 index 0000000000..1592472532 --- /dev/null +++ b/test/core/debug/BUILD @@ -0,0 +1,34 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_cc_binary", "grpc_package") + +grpc_package(name = "test/core/debug") + +licenses(["notice"]) # Apache v2 + +grpc_cc_test( + name = "stats_test", + srcs = ["stats_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/end2end/tests/streaming_error_response.cc b/test/core/end2end/tests/streaming_error_response.cc index 4c357e077e..713975a33f 100644 --- a/test/core/end2end/tests/streaming_error_response.cc +++ b/test/core/end2end/tests/streaming_error_response.cc @@ -89,7 +89,8 @@ static void end_test(grpc_end2end_test_fixture* f) { } /* Client sends a request with payload, server reads then returns status. */ -static void test(grpc_end2end_test_config config, bool request_status_early) { +static void test(grpc_end2end_test_config config, bool request_status_early, + bool recv_message_separately) { grpc_call* c; grpc_call* s; grpc_slice response_payload1_slice = grpc_slice_from_copied_string("hello"); @@ -116,6 +117,7 @@ static void test(grpc_end2end_test_config config, bool request_status_early) { int was_cancelled = 2; gpr_timespec deadline = five_seconds_from_now(); + GPR_ASSERT(!recv_message_separately || request_status_early); c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr); @@ -136,9 +138,11 @@ static void test(grpc_end2end_test_config config, bool request_status_early) { op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &response_payload1_recv; - op++; + if (!recv_message_separately) { + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload1_recv; + op++; + } if (request_status_early) { op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; @@ -168,10 +172,24 @@ static void test(grpc_end2end_test_config config, bool request_status_early) { nullptr); GPR_ASSERT(GRPC_CALL_OK == error); + if (recv_message_separately) { + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload1_recv; + op++; + error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(4), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + } + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); if (!request_status_early) { CQ_EXPECT_COMPLETION(cqv, tag(1), 1); } + if (recv_message_separately) { + CQ_EXPECT_COMPLETION(cqv, tag(4), 1); + } cq_verify(cqv); memset(ops, 0, sizeof(ops)); @@ -265,8 +283,9 @@ static void test(grpc_end2end_test_config config, bool request_status_early) { } void streaming_error_response(grpc_end2end_test_config config) { - test(config, false); - test(config, true); + test(config, false, false); + test(config, true, false); + test(config, true, true); } void streaming_error_response_pre_init(void) {} diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 235249e8bf..4e3d841db0 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -38,6 +38,7 @@ grpc_cc_library( grpc_cc_library( name = "interceptors_util", testonly = True, + srcs = ["interceptors_util.cc"], hdrs = ["interceptors_util.h"], external_deps = [ "gtest", @@ -158,6 +159,7 @@ grpc_cc_library( "gtest", ], deps = [ + ":interceptors_util", ":test_service_impl", "//:gpr", "//:grpc", diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 205f64c0f2..0b34ec93ae 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -43,89 +43,6 @@ namespace grpc { namespace testing { namespace { -class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test { - protected: - ClientInterceptorsStreamingEnd2endTest() { - int port = grpc_pick_unused_port_or_die(); - - ServerBuilder builder; - server_address_ = "localhost:" + std::to_string(port); - builder.AddListeningPort(server_address_, InsecureServerCredentials()); - builder.RegisterService(&service_); - server_ = builder.BuildAndStart(); - } - - ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); } - - std::string server_address_; - EchoTestServiceStreamingImpl service_; - std::unique_ptr<Server> server_; -}; - -class ClientInterceptorsEnd2endTest : public ::testing::Test { - protected: - ClientInterceptorsEnd2endTest() { - int port = grpc_pick_unused_port_or_die(); - - ServerBuilder builder; - server_address_ = "localhost:" + std::to_string(port); - builder.AddListeningPort(server_address_, InsecureServerCredentials()); - builder.RegisterService(&service_); - server_ = builder.BuildAndStart(); - } - - ~ClientInterceptorsEnd2endTest() { server_->Shutdown(); } - - std::string server_address_; - TestServiceImpl service_; - std::unique_ptr<Server> server_; -}; - -/* This interceptor does nothing. Just keeps a global count on the number of - * times it was invoked. */ -class DummyInterceptor : public experimental::Interceptor { - public: - DummyInterceptor(experimental::ClientRpcInfo* info) {} - - virtual void Intercept(experimental::InterceptorBatchMethods* methods) { - if (methods->QueryInterceptionHookPoint( - experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { - num_times_run_++; - } else if (methods->QueryInterceptionHookPoint( - experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA)) { - num_times_run_reverse_++; - } - methods->Proceed(); - } - - static void Reset() { - num_times_run_.store(0); - num_times_run_reverse_.store(0); - } - - static int GetNumTimesRun() { - EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); - return num_times_run_.load(); - } - - private: - static std::atomic<int> num_times_run_; - static std::atomic<int> num_times_run_reverse_; -}; - -std::atomic<int> DummyInterceptor::num_times_run_; -std::atomic<int> DummyInterceptor::num_times_run_reverse_; - -class DummyInterceptorFactory - : public experimental::ClientInterceptorFactoryInterface { - public: - virtual experimental::Interceptor* CreateClientInterceptor( - experimental::ClientRpcInfo* info) override { - return new DummyInterceptor(info); - } -}; - /* Hijacks Echo RPC and fills in the expected values */ class HijackingInterceptor : public experimental::Interceptor { public: @@ -422,6 +339,25 @@ class LoggingInterceptorFactory } }; +class ClientInterceptorsEnd2endTest : public ::testing::Test { + protected: + ClientInterceptorsEnd2endTest() { + int port = grpc_pick_unused_port_or_die(); + + ServerBuilder builder; + server_address_ = "localhost:" + std::to_string(port); + builder.AddListeningPort(server_address_, InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + } + + ~ClientInterceptorsEnd2endTest() { server_->Shutdown(); } + + std::string server_address_; + TestServiceImpl service_; + std::unique_ptr<Server> server_; +}; + TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) { ChannelArguments args; DummyInterceptor::Reset(); @@ -538,6 +474,25 @@ TEST_F(ClientInterceptorsEnd2endTest, EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } +class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test { + protected: + ClientInterceptorsStreamingEnd2endTest() { + int port = grpc_pick_unused_port_or_die(); + + ServerBuilder builder; + server_address_ = "localhost:" + std::to_string(port); + builder.AddListeningPort(server_address_, InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + } + + ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); } + + std::string server_address_; + EchoTestServiceStreamingImpl service_; + std::unique_ptr<Server> server_; +}; + TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingTest) { ChannelArguments args; DummyInterceptor::Reset(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index fc07681535..4558437102 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -40,6 +40,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "test/cpp/end2end/interceptors_util.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/string_ref_helper.h" #include "test/cpp/util/test_credentials_provider.h" @@ -179,7 +180,7 @@ class Proxy : public ::grpc::testing::EchoTestService::Service { } private: - std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_; }; class TestServiceImplDupPkg @@ -194,9 +195,14 @@ class TestServiceImplDupPkg class TestScenario { public: - TestScenario(bool proxy, bool inproc_stub, const grpc::string& creds_type) - : use_proxy(proxy), inproc(inproc_stub), credentials_type(creds_type) {} + TestScenario(bool interceptors, bool proxy, bool inproc_stub, + const grpc::string& creds_type) + : use_interceptors(interceptors), + use_proxy(proxy), + inproc(inproc_stub), + credentials_type(creds_type) {} void Log() const; + bool use_interceptors; bool use_proxy; bool inproc; const grpc::string credentials_type; @@ -204,8 +210,9 @@ class TestScenario { static std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) { - return out << "TestScenario{use_proxy=" - << (scenario.use_proxy ? "true" : "false") + return out << "TestScenario{use_interceptors=" + << (scenario.use_interceptors ? "true" : "false") + << ", use_proxy=" << (scenario.use_proxy ? "true" : "false") << ", inproc=" << (scenario.inproc ? "true" : "false") << ", credentials='" << scenario.credentials_type << "'}"; } @@ -260,6 +267,17 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { if (GetParam().credentials_type != kInsecureCredentialsType) { server_creds->SetAuthMetadataProcessor(processor); } + if (GetParam().use_interceptors) { + std::vector< + std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + creators; + // Add 20 dummy server interceptors + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + } + builder.experimental().SetInterceptorCreators(std::move(creators)); + } builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); builder.RegisterService("foo.test.youtube.com", &special_service_); @@ -292,10 +310,21 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); if (!GetParam().inproc) { - channel_ = - CreateCustomChannel(server_address_.str(), channel_creds, args); + if (!GetParam().use_interceptors) { + channel_ = + CreateCustomChannel(server_address_.str(), channel_creds, args); + } else { + channel_ = CreateCustomChannelWithInterceptors( + server_address_.str(), channel_creds, args, + CreateDummyClientInterceptors()); + } } else { - channel_ = server_->InProcessChannel(args); + if (!GetParam().use_interceptors) { + channel_ = server_->InProcessChannel(args); + } else { + channel_ = server_->experimental().InProcessChannelWithInterceptors( + args, CreateDummyClientInterceptors()); + } } } @@ -320,6 +349,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { } stub_ = grpc::testing::EchoTestService::NewStub(channel_); + DummyInterceptor::Reset(); } bool is_server_started_; @@ -376,6 +406,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestRequestStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); ResetStub(); EchoRequest request; EchoResponse response; @@ -432,6 +463,10 @@ class End2endServerTryCancelTest : public End2endTest { EXPECT_FALSE(s.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } // Helper for testing server-streaming RPCs which are cancelled on the server. @@ -449,6 +484,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestResponseStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel) { + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); ResetStub(); EchoRequest request; EchoResponse response; @@ -508,7 +544,10 @@ class End2endServerTryCancelTest : public End2endTest { } EXPECT_FALSE(s.ok()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } // Helper for testing bidirectional-streaming RPCs which are cancelled on the @@ -526,6 +565,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, int num_messages) { + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); ResetStub(); EchoRequest request; EchoResponse response; @@ -592,6 +632,10 @@ class End2endServerTryCancelTest : public End2endTest { EXPECT_FALSE(s.ok()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } }; @@ -989,6 +1033,9 @@ TEST_P(End2endTest, CancelRpcBeforeStart) { Status s = stub_->Echo(&context, request, &response); EXPECT_EQ("", response.message()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } // Client cancels request stream after sending two messages @@ -1009,6 +1056,9 @@ TEST_P(End2endTest, ClientCancelsRequestStream) { EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); EXPECT_EQ(response.message(), ""); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } // Client cancels server stream after sending some messages @@ -1041,6 +1091,9 @@ TEST_P(End2endTest, ClientCancelsResponseStream) { // The final status could be either of CANCELLED or OK depending on // who won the race. EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } // Client cancels bidi stream after sending some messages @@ -1074,6 +1127,9 @@ TEST_P(End2endTest, ClientCancelsBidi) { Status s = stream->Finish(); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } } TEST_P(End2endTest, RpcMaxMessageSize) { @@ -1802,13 +1858,16 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy, } GPR_ASSERT(!credentials_types.empty()); for (const auto& cred : credentials_types) { - scenarios.emplace_back(false, false, cred); + scenarios.emplace_back(false, false, false, cred); + scenarios.emplace_back(true, false, false, cred); if (use_proxy) { - scenarios.emplace_back(true, false, cred); + scenarios.emplace_back(false, true, false, cred); + scenarios.emplace_back(true, true, false, cred); } } if (test_inproc && insec_ok()) { - scenarios.emplace_back(false, true, kInsecureCredentialsType); + scenarios.emplace_back(false, false, true, kInsecureCredentialsType); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType); } return scenarios; } diff --git a/test/cpp/end2end/interceptors_util.cc b/test/cpp/end2end/interceptors_util.cc new file mode 100644 index 0000000000..602d1695a3 --- /dev/null +++ b/test/cpp/end2end/interceptors_util.cc @@ -0,0 +1,151 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/cpp/end2end/interceptors_util.h" + +namespace grpc { +namespace testing { + +std::atomic<int> DummyInterceptor::num_times_run_; +std::atomic<int> DummyInterceptor::num_times_run_reverse_; +std::atomic<int> DummyInterceptor::num_times_cancel_; + +void MakeCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + Status s = stub->Echo(&ctx, req, &resp); + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp.message(), "Hello"); +} + +void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + string expected_resp = ""; + auto writer = stub->RequestStream(&ctx, &resp); + for (int i = 0; i < 10; i++) { + writer->Write(req); + expected_resp += "Hello"; + } + writer->WritesDone(); + Status s = writer->Finish(); + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp.message(), expected_resp); +} + +void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + string expected_resp = ""; + auto reader = stub->ResponseStream(&ctx, req); + int count = 0; + while (reader->Read(&resp)) { + EXPECT_EQ(resp.message(), "Hello"); + count++; + } + ASSERT_EQ(count, 10); + Status s = reader->Finish(); + EXPECT_EQ(s.ok(), true); +} + +void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + EchoResponse resp; + ctx.AddMetadata("testkey", "testvalue"); + auto stream = stub->BidiStream(&ctx); + for (auto i = 0; i < 10; i++) { + req.set_message("Hello" + std::to_string(i)); + stream->Write(req); + stream->Read(&resp); + EXPECT_EQ(req.message(), resp.message()); + } + ASSERT_TRUE(stream->WritesDone()); + Status s = stream->Finish(); + EXPECT_EQ(s.ok(), true); +} + +void MakeCallbackCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + std::mutex mu; + std::condition_variable cv; + bool done = false; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + stub->experimental_async()->Echo(&ctx, &req, &resp, + [&resp, &mu, &done, &cv](Status s) { + // gpr_log(GPR_ERROR, "got the callback"); + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp.message(), "Hello"); + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } +} + +bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map, + const string& key, const string& value) { + for (const auto& pair : map) { + if (pair.first.starts_with(key) && pair.second.starts_with(value)) { + return true; + } + } + return false; +} + +std::unique_ptr<std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> +CreateDummyClientInterceptors() { + auto creators = std::unique_ptr<std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>( + new std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); + // Add 20 dummy interceptors before hijacking interceptor + for (auto i = 0; i < 20; i++) { + creators->push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + } + return creators; +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/end2end/interceptors_util.h b/test/cpp/end2end/interceptors_util.h index 5f0aa37dc0..b4c4791fca 100644 --- a/test/cpp/end2end/interceptors_util.h +++ b/test/cpp/end2end/interceptors_util.h @@ -16,6 +16,10 @@ * */ +#include <condition_variable> + +#include <grpcpp/channel.h> + #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/cpp/util/string_ref_helper.h" @@ -23,6 +27,61 @@ namespace grpc { namespace testing { +/* This interceptor does nothing. Just keeps a global count on the number of + * times it was invoked. */ +class DummyInterceptor : public experimental::Interceptor { + public: + DummyInterceptor() {} + + virtual void Intercept(experimental::InterceptorBatchMethods* methods) { + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { + num_times_run_++; + } else if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA)) { + num_times_run_reverse_++; + } else if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_CANCEL)) { + num_times_cancel_++; + } + methods->Proceed(); + } + + static void Reset() { + num_times_run_.store(0); + num_times_run_reverse_.store(0); + num_times_cancel_.store(0); + } + + static int GetNumTimesRun() { + EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); + return num_times_run_.load(); + } + + static int GetNumTimesCancel() { return num_times_cancel_.load(); } + + private: + static std::atomic<int> num_times_run_; + static std::atomic<int> num_times_run_reverse_; + static std::atomic<int> num_times_cancel_; +}; + +class DummyInterceptorFactory + : public experimental::ClientInterceptorFactoryInterface, + public experimental::ServerInterceptorFactoryInterface { + public: + virtual experimental::Interceptor* CreateClientInterceptor( + experimental::ClientRpcInfo* info) override { + return new DummyInterceptor(); + } + + virtual experimental::Interceptor* CreateServerInterceptor( + experimental::ServerRpcInfo* info) override { + return new DummyInterceptor(); + } +}; + class EchoTestServiceStreamingImpl : public EchoTestService::Service { public: ~EchoTestServiceStreamingImpl() override {} @@ -77,115 +136,27 @@ class EchoTestServiceStreamingImpl : public EchoTestService::Service { } }; -void MakeCall(const std::shared_ptr<Channel>& channel) { - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - Status s = stub->Echo(&ctx, req, &resp); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); -} +void MakeCall(const std::shared_ptr<Channel>& channel); -void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) { - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - string expected_resp = ""; - auto writer = stub->RequestStream(&ctx, &resp); - for (int i = 0; i < 10; i++) { - writer->Write(req); - expected_resp += "Hello"; - } - writer->WritesDone(); - Status s = writer->Finish(); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), expected_resp); -} +void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel); -void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) { - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - string expected_resp = ""; - auto reader = stub->ResponseStream(&ctx, req); - int count = 0; - while (reader->Read(&resp)) { - EXPECT_EQ(resp.message(), "Hello"); - count++; - } - ASSERT_EQ(count, 10); - Status s = reader->Finish(); - EXPECT_EQ(s.ok(), true); -} +void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel); -void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) { - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - EchoResponse resp; - ctx.AddMetadata("testkey", "testvalue"); - auto stream = stub->BidiStream(&ctx); - for (auto i = 0; i < 10; i++) { - req.set_message("Hello" + std::to_string(i)); - stream->Write(req); - stream->Read(&resp); - EXPECT_EQ(req.message(), resp.message()); - } - ASSERT_TRUE(stream->WritesDone()); - Status s = stream->Finish(); - EXPECT_EQ(s.ok(), true); -} +void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel); -void MakeCallbackCall(const std::shared_ptr<Channel>& channel) { - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - std::mutex mu; - std::condition_variable cv; - bool done = false; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - stub->experimental_async()->Echo(&ctx, &req, &resp, - [&resp, &mu, &done, &cv](Status s) { - // gpr_log(GPR_ERROR, "got the callback"); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); - std::lock_guard<std::mutex> l(mu); - done = true; - cv.notify_one(); - }); - std::unique_lock<std::mutex> l(mu); - while (!done) { - cv.wait(l); - } -} +void MakeCallbackCall(const std::shared_ptr<Channel>& channel); bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map, - const string& key, const string& value) { - for (const auto& pair : map) { - if (pair.first.starts_with(key) && pair.second.starts_with(value)) { - return true; - } - } - return false; -} + const string& key, const string& value); -void* tag(int i) { return (void*)static_cast<intptr_t>(i); } -int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } +std::unique_ptr<std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> +CreateDummyClientInterceptors(); + +inline void* tag(int i) { return (void*)static_cast<intptr_t>(i); } +inline int detag(void* p) { + return static_cast<int>(reinterpret_cast<intptr_t>(p)); +} class Verifier { public: diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index e08a4493d3..4ae086ea76 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -42,51 +42,6 @@ namespace grpc { namespace testing { namespace { -/* This interceptor does nothing. Just keeps a global count on the number of - * times it was invoked. */ -class DummyInterceptor : public experimental::Interceptor { - public: - DummyInterceptor(experimental::ServerRpcInfo* info) {} - - virtual void Intercept(experimental::InterceptorBatchMethods* methods) { - if (methods->QueryInterceptionHookPoint( - experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { - num_times_run_++; - } else if (methods->QueryInterceptionHookPoint( - experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA)) { - num_times_run_reverse_++; - } - methods->Proceed(); - } - - static void Reset() { - num_times_run_.store(0); - num_times_run_reverse_.store(0); - } - - static int GetNumTimesRun() { - EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); - return num_times_run_.load(); - } - - private: - static std::atomic<int> num_times_run_; - static std::atomic<int> num_times_run_reverse_; -}; - -std::atomic<int> DummyInterceptor::num_times_run_; -std::atomic<int> DummyInterceptor::num_times_run_reverse_; - -class DummyInterceptorFactory - : public experimental::ServerInterceptorFactoryInterface { - public: - virtual experimental::Interceptor* CreateServerInterceptor( - experimental::ServerRpcInfo* info) override { - return new DummyInterceptor(info); - } -}; - class LoggingInterceptor : public experimental::Interceptor { public: LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } diff --git a/tools/distrib/python/grpcio_tools/_parallel_compile_patch.py b/tools/distrib/python/grpcio_tools/_parallel_compile_patch.py new file mode 100644 index 0000000000..4d03ef49ba --- /dev/null +++ b/tools/distrib/python/grpcio_tools/_parallel_compile_patch.py @@ -0,0 +1,63 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Patches the compile() to allow enable parallel compilation of C/C++. + +build_ext has lots of C/C++ files and normally them one by one. +Enabling parallel build helps a lot. +""" + +import distutils.ccompiler +import os + +try: + BUILD_EXT_COMPILER_JOBS = int( + os.environ.get('GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS', '1')) +except ValueError: + BUILD_EXT_COMPILER_JOBS = 1 + + +# monkey-patch for parallel compilation +def _parallel_compile(self, + sources, + output_dir=None, + macros=None, + include_dirs=None, + debug=0, + extra_preargs=None, + extra_postargs=None, + depends=None): + # setup the same way as distutils.ccompiler.CCompiler + # https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564 + macros, objects, extra_postargs, pp_opts, build = self._setup_compile( + output_dir, macros, include_dirs, sources, depends, extra_postargs) + cc_args = self._get_cc_args(pp_opts, debug, extra_preargs) + + def _compile_single_file(obj): + try: + src, ext = build[obj] + except KeyError: + return + self._compile(obj, src, ext, cc_args, extra_postargs, pp_opts) + + # run compilation of individual files in parallel + import multiprocessing.pool + multiprocessing.pool.ThreadPool(BUILD_EXT_COMPILER_JOBS).map( + _compile_single_file, objects) + return objects + + +def monkeypatch_compile_maybe(): + """Monkeypatching is dumb, but the build speed gain is worth it.""" + if BUILD_EXT_COMPILER_JOBS > 1: + distutils.ccompiler.CCompiler.compile = _parallel_compile diff --git a/tools/distrib/python/grpcio_tools/setup.py b/tools/distrib/python/grpcio_tools/setup.py index c13dfe9ade..64c468cbf7 100644 --- a/tools/distrib/python/grpcio_tools/setup.py +++ b/tools/distrib/python/grpcio_tools/setup.py @@ -34,9 +34,12 @@ from setuptools.command import build_ext os.chdir(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.abspath('.')) +import _parallel_compile_patch import protoc_lib_deps import grpc_version +_parallel_compile_patch.monkeypatch_compile_maybe() + CLASSIFIERS = [ 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python', diff --git a/tools/dockerfile/interoptest/grpc_interop_cxx/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_cxx/build_interop.sh index fd549fb9e5..8b8c6be8f1 100755 --- a/tools/dockerfile/interoptest/grpc_interop_cxx/build_interop.sh +++ b/tools/dockerfile/interoptest/grpc_interop_cxx/build_interop.sh @@ -28,10 +28,10 @@ cp -r /var/local/jenkins/service_account $HOME || true cd /var/local/git/grpc -make install-certs +make install-certs -j4 # build C++ interop client & server -make interop_client interop_server -j2 +make interop_client interop_server -j4 # build C++ http2 client -make http2_client +make http2_client -j4 diff --git a/tools/dockerfile/interoptest/grpc_interop_php/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_php/build_interop.sh index 999976d15d..fc5c22083a 100755 --- a/tools/dockerfile/interoptest/grpc_interop_php/build_interop.sh +++ b/tools/dockerfile/interoptest/grpc_interop_php/build_interop.sh @@ -28,12 +28,13 @@ cp -r /var/local/jenkins/service_account $HOME || true cd /var/local/git/grpc -# gRPC core and protobuf need to be installed -make install +# Install gRPC C core and build codegen plugins +make -j4 install_c plugins -(cd src/php/ext/grpc && phpize && ./configure && make) +(cd src/php/ext/grpc && phpize && ./configure && make -j4) -(cd third_party/protobuf && make install) +# Install protobuf (need access to protoc) +(cd third_party/protobuf && make -j4 install) (cd src/php && php -d extension=ext/grpc/modules/grpc.so /usr/local/bin/composer install) diff --git a/tools/dockerfile/interoptest/grpc_interop_php7/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_php7/build_interop.sh index efa97530c8..248a8f680b 100755 --- a/tools/dockerfile/interoptest/grpc_interop_php7/build_interop.sh +++ b/tools/dockerfile/interoptest/grpc_interop_php7/build_interop.sh @@ -28,12 +28,13 @@ cp -r /var/local/jenkins/service_account $HOME || true cd /var/local/git/grpc -# gRPC core and protobuf need to be installed -make install +# Install gRPC C core and build codegen plugins +make -j4 install_c plugins -(cd src/php/ext/grpc && phpize && ./configure && make) +(cd src/php/ext/grpc && phpize && ./configure && make -j4) -(cd third_party/protobuf && make install) +# Install protobuf (need access to protoc) +(cd third_party/protobuf && make -j4 install) (cd src/php && php -d extension=ext/grpc/modules/grpc.so /usr/local/bin/composer install) diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index dc27a6ae76..a96683883c 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1187,6 +1187,7 @@ src/core/lib/uri/uri_parser.h \ src/cpp/README.md \ src/cpp/client/channel_cc.cc \ src/cpp/client/client_context.cc \ +src/cpp/client/client_interceptor.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/create_channel_internal.cc \ src/cpp/client/create_channel_internal.h \ @@ -1196,7 +1197,6 @@ src/cpp/client/generic_stub.cc \ src/cpp/client/insecure_credentials.cc \ src/cpp/client/secure_credentials.cc \ src/cpp/client/secure_credentials.h \ -src/cpp/codegen/client_interceptor.cc \ src/cpp/codegen/codegen_init.cc \ src/cpp/common/alarm.cc \ src/cpp/common/auth_property_iterator.cc \ diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py index 3e811ca2bf..6e9cda376b 100755 --- a/tools/gcp/utils/big_query_utils.py +++ b/tools/gcp/utils/big_query_utils.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function + import argparse import json import uuid @@ -50,11 +52,11 @@ def create_dataset(biq_query, project_id, dataset_id): dataset_req.execute(num_retries=NUM_RETRIES) except HttpError as http_error: if http_error.resp.status == 409: - print 'Warning: The dataset %s already exists' % dataset_id + print('Warning: The dataset %s already exists' % dataset_id) else: # Note: For more debugging info, print "http_error.content" - print 'Error in creating dataset: %s. Err: %s' % (dataset_id, - http_error) + print('Error in creating dataset: %s. Err: %s' % (dataset_id, + http_error)) is_success = False return is_success @@ -122,13 +124,13 @@ def create_table2(big_query, table_req = big_query.tables().insert( projectId=project_id, datasetId=dataset_id, body=body) res = table_req.execute(num_retries=NUM_RETRIES) - print 'Successfully created %s "%s"' % (res['kind'], res['id']) + print('Successfully created %s "%s"' % (res['kind'], res['id'])) except HttpError as http_error: if http_error.resp.status == 409: - print 'Warning: Table %s already exists' % table_id + print('Warning: Table %s already exists' % table_id) else: - print 'Error in creating table: %s. Err: %s' % (table_id, - http_error) + print('Error in creating table: %s. Err: %s' % (table_id, + http_error)) is_success = False return is_success @@ -154,9 +156,9 @@ def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): tableId=table_id, body=body) res = table_req.execute(num_retries=NUM_RETRIES) - print 'Successfully patched %s "%s"' % (res['kind'], res['id']) + print('Successfully patched %s "%s"' % (res['kind'], res['id'])) except HttpError as http_error: - print 'Error in creating table: %s. Err: %s' % (table_id, http_error) + print('Error in creating table: %s. Err: %s' % (table_id, http_error)) is_success = False return is_success @@ -172,10 +174,10 @@ def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): body=body) res = insert_req.execute(num_retries=NUM_RETRIES) if res.get('insertErrors', None): - print 'Error inserting rows! Response: %s' % res + print('Error inserting rows! Response: %s' % res) is_success = False except HttpError as http_error: - print 'Error inserting rows to the table %s' % table_id + print('Error inserting rows to the table %s' % table_id) is_success = False return is_success @@ -189,8 +191,8 @@ def sync_query_job(big_query, project_id, query, timeout=5000): projectId=project_id, body=query_data).execute(num_retries=NUM_RETRIES) except HttpError as http_error: - print 'Query execute job failed with error: %s' % http_error - print http_error.content + print('Query execute job failed with error: %s' % http_error) + print(http_error.content) return query_job diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc index 4b7477db14..2362b370d5 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc @@ -57,25 +57,25 @@ source $HOME/.rvm/scripts/rvm set -e # rvm commands are very verbose time rvm install 2.5.0 rvm use 2.5.0 --default -gem install bundler --no-ri --no-doc -gem install cocoapods --version 1.3.1 --no-ri --no-doc -gem install rake-compiler --no-ri --no-doc +time gem install bundler --no-ri --no-doc +time gem install cocoapods --version 1.3.1 --no-ri --no-doc +time gem install rake-compiler --no-ri --no-doc rvm osx-ssl-certs status all rvm osx-ssl-certs update all set -ex # cocoapods export LANG=en_US.UTF-8 -pod repo update # needed by python +time pod repo update # needed by python # python -pip install virtualenv --user python -pip install -U Mako six tox setuptools twisted pyyaml --user python +time pip install virtualenv --user python +time pip install -U Mako six tox setuptools twisted pyyaml --user python export PYTHONPATH=/Library/Python/3.4/site-packages # Install Python 3.7 -curl -O https://www.python.org/ftp/python/3.7.0/python-3.7.0-macosx10.9.pkg -sudo installer -pkg ./python-3.7.0-macosx10.9.pkg -target / +time curl -O https://www.python.org/ftp/python/3.7.0/python-3.7.0-macosx10.9.pkg +time sudo installer -pkg ./python-3.7.0-macosx10.9.pkg -target / # set xcode version for Obj-C tests sudo xcode-select -switch /Applications/Xcode_9.2.app/Contents/Developer/ @@ -88,7 +88,7 @@ export DOTNET_CLI_TELEMETRY_OPTOUT=true # TODO(jtattermusch): better debugging of clock skew, remove once not needed date -git submodule update --init +time git submodule update --init # Store intermediate build files of ObjC tests into /tmpfs mkdir /tmpfs/Build-ios-binary-size diff --git a/tools/run_tests/artifacts/build_artifact_python.bat b/tools/run_tests/artifacts/build_artifact_python.bat index d277668c94..795e80dc40 100644 --- a/tools/run_tests/artifacts/build_artifact_python.bat +++ b/tools/run_tests/artifacts/build_artifact_python.bat @@ -22,6 +22,10 @@ pip install -rrequirements.txt set GRPC_PYTHON_BUILD_WITH_CYTHON=1 +@rem Allow build_ext to build C/C++ files in parallel +@rem by enabling a monkeypatch. It speeds up the build a lot. +set GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=2 + mkdir -p %ARTIFACTS_OUT% set ARTIFACT_DIR=%cd%\%ARTIFACTS_OUT% diff --git a/tools/run_tests/artifacts/build_artifact_python.sh b/tools/run_tests/artifacts/build_artifact_python.sh index 2878005bb2..9a2e0f739f 100755 --- a/tools/run_tests/artifacts/build_artifact_python.sh +++ b/tools/run_tests/artifacts/build_artifact_python.sh @@ -22,6 +22,10 @@ export PYTHON=${PYTHON:-python} export PIP=${PIP:-pip} export AUDITWHEEL=${AUDITWHEEL:-auditwheel} +# Allow build_ext to build C/C++ files in parallel +# by enabling a monkeypatch. It speeds up the build a lot. +export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=2 + mkdir -p "${ARTIFACTS_OUT}" ARTIFACT_DIR="$PWD/${ARTIFACTS_OUT}" diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index b94fc77d76..1bbacc41f0 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -3402,6 +3402,7 @@ "name": "client_interceptors_end2end_test", "src": [ "test/cpp/end2end/client_interceptors_end2end_test.cc", + "test/cpp/end2end/interceptors_util.cc", "test/cpp/end2end/interceptors_util.h" ], "third_party": false, @@ -3600,12 +3601,16 @@ "grpc++_test_util", "grpc_test_util" ], - "headers": [], + "headers": [ + "test/cpp/end2end/interceptors_util.h" + ], "is_filegroup": false, "language": "c++", "name": "end2end_test", "src": [ - "test/cpp/end2end/end2end_test.cc" + "test/cpp/end2end/end2end_test.cc", + "test/cpp/end2end/interceptors_util.cc", + "test/cpp/end2end/interceptors_util.h" ], "third_party": false, "type": "target" @@ -4736,6 +4741,7 @@ "language": "c++", "name": "server_interceptors_end2end_test", "src": [ + "test/cpp/end2end/interceptors_util.cc", "test/cpp/end2end/interceptors_util.h", "test/cpp/end2end/server_interceptors_end2end_test.cc" ], @@ -11322,7 +11328,6 @@ "language": "c++", "name": "grpc++_codegen_base_src", "src": [ - "src/cpp/codegen/client_interceptor.cc", "src/cpp/codegen/codegen_init.cc" ], "third_party": false, @@ -11563,6 +11568,7 @@ "include/grpcpp/support/time.h", "src/cpp/client/channel_cc.cc", "src/cpp/client/client_context.cc", + "src/cpp/client/client_interceptor.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/create_channel_internal.cc", "src/cpp/client/create_channel_internal.h", diff --git a/tools/run_tests/helper_scripts/build_python.sh b/tools/run_tests/helper_scripts/build_python.sh index 6990244e51..eb3ea9e1f5 100755 --- a/tools/run_tests/helper_scripts/build_python.sh +++ b/tools/run_tests/helper_scripts/build_python.sh @@ -80,6 +80,8 @@ function toolchain() { fi } +# TODO(jtattermusch): this adds dependency on grealpath on mac +# (brew install coreutils) for little reason. # Command to invoke the linux command `realpath` or equivalent. function script_realpath() { # Find `realpath` @@ -112,6 +114,10 @@ export CFLAGS="-I$ROOT/include -std=gnu99 -fno-wrapv $CFLAGS" export GRPC_PYTHON_BUILD_WITH_CYTHON=1 export LANG=en_US.UTF-8 +# Allow build_ext to build C/C++ files in parallel +# by enabling a monkeypatch. It speeds up the build a lot. +export GRPC_PYTHON_BUILD_EXT_COMPILER_JOBS=4 + # If ccache is available on Linux, use it. if [ "$(is_linux)" ]; then # We're not on Darwin (Mac OS X) diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index b732e1e03e..578712c393 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -13,8 +13,6 @@ # limitations under the License. """Run a group of subprocesses and then finish.""" -from __future__ import print_function - import logging import multiprocessing import os @@ -118,7 +116,7 @@ def eintr_be_gone(fn): while True: try: return fn() - except IOError, e: + except IOError as e: if e.errno != errno.EINTR: raise @@ -144,7 +142,7 @@ def message(tag, msg, explanatory_text=None, do_newline=False): if do_newline or explanatory_text is not None else '')) sys.stdout.flush() return - except IOError, e: + except IOError as e: if e.errno != errno.EINTR: raise diff --git a/tools/run_tests/python_utils/port_server.py b/tools/run_tests/python_utils/port_server.py index 83e09c09d0..243eef6827 100755 --- a/tools/run_tests/python_utils/port_server.py +++ b/tools/run_tests/python_utils/port_server.py @@ -14,15 +14,17 @@ # limitations under the License. """Manage TCP ports for unit tests; started by run_tests.py""" +from __future__ import print_function + import argparse -from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +from six.moves.socketserver import ThreadingMixIn import hashlib import os import socket import sys import time import random -from SocketServer import ThreadingMixIn import threading import platform @@ -32,7 +34,7 @@ import platform _MY_VERSION = 20 if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': - print _MY_VERSION + print(_MY_VERSION) sys.exit(0) argp = argparse.ArgumentParser(description='Server for httpcli_test') @@ -47,7 +49,7 @@ if args.logfile is not None: sys.stderr = open(args.logfile, 'w') sys.stdout = sys.stderr -print 'port server running on port %d' % args.port +print('port server running on port %d' % args.port) pool = [] in_use = {} @@ -74,7 +76,7 @@ def can_connect(port): try: s.connect(('localhost', port)) return True - except socket.error, e: + except socket.error as e: return False finally: s.close() @@ -86,7 +88,7 @@ def can_bind(port, proto): try: s.bind(('localhost', port)) return True - except socket.error, e: + except socket.error as e: return False finally: s.close() @@ -95,7 +97,7 @@ def can_bind(port, proto): def refill_pool(max_timeout, req): """Scan for ports not marked for being in use""" chk = [ - port for port in list(range(1025, 32766)) + port for port in range(1025, 32766) if port not in cronet_restricted_ports ] random.shuffle(chk) diff --git a/tools/run_tests/python_utils/report_utils.py b/tools/run_tests/python_utils/report_utils.py index b2a256ce29..8d8dedb929 100644 --- a/tools/run_tests/python_utils/report_utils.py +++ b/tools/run_tests/python_utils/report_utils.py @@ -13,8 +13,6 @@ # limitations under the License. """Generate XML and HTML test reports.""" -from __future__ import print_function - try: from mako.runtime import Context from mako.template import Template diff --git a/tools/run_tests/python_utils/start_port_server.py b/tools/run_tests/python_utils/start_port_server.py index 37995acbdf..0a32bf4418 100644 --- a/tools/run_tests/python_utils/start_port_server.py +++ b/tools/run_tests/python_utils/start_port_server.py @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import urllib -import jobset +from __future__ import print_function + +from . import jobset + +import six.moves.urllib.request as request import logging import os import socket @@ -33,8 +36,8 @@ def start_port_server(): # otherwise, leave it up try: version = int( - urllib.urlopen('http://localhost:%d/version_number' % - _PORT_SERVER_PORT).read()) + request.urlopen('http://localhost:%d/version_number' % + _PORT_SERVER_PORT).read()) logging.info('detected port server running version %d', version) running = True except Exception as e: @@ -51,7 +54,7 @@ def start_port_server(): running = (version >= current_version) if not running: logging.info('port_server version mismatch: killing the old one') - urllib.urlopen( + request.urlopen( 'http://localhost:%d/quitquitquit' % _PORT_SERVER_PORT).read() time.sleep(1) if not running: @@ -92,7 +95,7 @@ def start_port_server(): # try one final time: maybe another build managed to start one time.sleep(1) try: - urllib.urlopen( + request.urlopen( 'http://localhost:%d/get' % _PORT_SERVER_PORT).read() logging.info( 'last ditch attempt to contact port server succeeded') @@ -101,11 +104,11 @@ def start_port_server(): logging.exception( 'final attempt to contact port server failed') port_log = open(logfile, 'r').read() - print port_log + print(port_log) sys.exit(1) try: port_server_url = 'http://localhost:%d/get' % _PORT_SERVER_PORT - urllib.urlopen(port_server_url).read() + request.urlopen(port_server_url).read() logging.info('port server is up and ready') break except socket.timeout: diff --git a/tools/run_tests/python_utils/watch_dirs.py b/tools/run_tests/python_utils/watch_dirs.py index d2ad303a07..f2f1c006df 100755 --- a/tools/run_tests/python_utils/watch_dirs.py +++ b/tools/run_tests/python_utils/watch_dirs.py @@ -15,13 +15,14 @@ import os import time +from six import string_types class DirWatcher(object): """Helper to watch a (set) of directories for modifications.""" def __init__(self, paths): - if isinstance(paths, basestring): + if isinstance(paths, string_types): paths = [paths] self._done = False self.paths = list(paths) diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index 5722a88182..ff6682c3cf 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -63,17 +63,17 @@ _SKIP_ADVANCED = [ 'unimplemented_service' ] +_SKIP_SPECIAL_STATUS_MESSAGE = ['special_status_message'] + _TEST_TIMEOUT = 3 * 60 # disable this test on core-based languages, # see https://github.com/grpc/grpc/issues/9779 _SKIP_DATA_FRAME_PADDING = ['data_frame_padding'] -# report suffix is important for reports to get picked up by internal CI -_INTERNAL_CL_XML_REPORT = 'sponge_log.xml' - -# report suffix is important for reports to get picked up by internal CI -_XML_REPORT = 'report.xml' +# report suffix "sponge_log.xml" is important for reports to get picked up by internal CI +_DOCKER_BUILD_XML_REPORT = 'interop_docker_build/sponge_log.xml' +_TESTS_XML_REPORT = 'interop_test/sponge_log.xml' class CXXLanguage: @@ -100,7 +100,7 @@ class CXXLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_DATA_FRAME_PADDING + return _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return [] @@ -129,7 +129,7 @@ class CSharpLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _SKIP_COMPRESSION @@ -158,7 +158,7 @@ class CSharpCoreCLRLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _SKIP_COMPRESSION @@ -188,10 +188,10 @@ class DartLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_COMPRESSION + return _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): - return _SKIP_COMPRESSION + return _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE def __str__(self): return 'dart' @@ -248,7 +248,7 @@ class JavaOkHttpClient: return {} def unimplemented_test_cases(self): - return _SKIP_DATA_FRAME_PADDING + return _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def __str__(self): return 'javaokhttp' @@ -309,7 +309,7 @@ class Http2Server: return {} def unimplemented_test_cases(self): - return _TEST_CASES + _SKIP_DATA_FRAME_PADDING + return _TEST_CASES + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _TEST_CASES @@ -339,7 +339,7 @@ class Http2Client: return {} def unimplemented_test_cases(self): - return _TEST_CASES + return _TEST_CASES + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _TEST_CASES @@ -431,7 +431,7 @@ class PHPLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return [] @@ -456,7 +456,7 @@ class PHP7Language: return {} def unimplemented_test_cases(self): - return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return [] @@ -491,7 +491,7 @@ class ObjcLanguage: # cmdline argument. Here we return all but one test cases as unimplemented, # and depend upon ObjC test's behavior that it runs all cases even when # we tell it to run just one. - return _TEST_CASES[1:] + _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _TEST_CASES[1:] + _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _SKIP_COMPRESSION @@ -526,7 +526,7 @@ class RubyLanguage: return {} def unimplemented_test_cases(self): - return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + return _SKIP_SERVER_COMPRESSION + _SKIP_DATA_FRAME_PADDING + _SKIP_SPECIAL_STATUS_MESSAGE def unimplemented_test_cases_server(self): return _SKIP_COMPRESSION @@ -610,7 +610,7 @@ _TEST_CASES = [ 'custom_metadata', 'status_code_and_message', 'unimplemented_method', 'client_compressed_unary', 'server_compressed_unary', 'client_compressed_streaming', 'server_compressed_streaming', - 'unimplemented_service' + 'unimplemented_service', 'special_status_message' ] _AUTH_TEST_CASES = [ @@ -1161,8 +1161,9 @@ argp.add_argument( default=False, action='store_const', const=True, - help=('Put reports into subdirectories to improve ' - 'presentation of results by Internal CI.')) + help=( + '(Deprecated, has no effect) Put reports into subdirectories to improve ' + 'presentation of results by Internal CI.')) argp.add_argument( '--bq_result_table', default='', @@ -1251,8 +1252,12 @@ if args.use_docker: if args.verbose: print('Jobs to run: \n%s\n' % '\n'.join(str(j) for j in build_jobs)) - num_failures, _ = jobset.run( + num_failures, build_resultset = jobset.run( build_jobs, newline_on_success=True, maxjobs=args.jobs) + + report_utils.render_junit_xml_report(build_resultset, + _DOCKER_BUILD_XML_REPORT) + if num_failures == 0: jobset.message( 'SUCCESS', @@ -1315,7 +1320,7 @@ try: for language in languages: for test_case in _TEST_CASES: if not test_case in language.unimplemented_test_cases(): - if not test_case in _SKIP_ADVANCED + _SKIP_COMPRESSION: + if not test_case in _SKIP_ADVANCED + _SKIP_COMPRESSION + _SKIP_SPECIAL_STATUS_MESSAGE: tls_test_job = cloud_to_prod_jobspec( language, test_case, @@ -1517,10 +1522,7 @@ try: write_cmdlog_maybe(server_manual_cmd_log, 'interop_server_cmds.sh') write_cmdlog_maybe(client_manual_cmd_log, 'interop_client_cmds.sh') - xml_report_name = _XML_REPORT - if args.internal_ci: - xml_report_name = _INTERNAL_CL_XML_REPORT - report_utils.render_junit_xml_report(resultset, xml_report_name) + report_utils.render_junit_xml_report(resultset, _TESTS_XML_REPORT) for name, job in resultset.items(): if "http2" in name: diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 44151f49fb..62bb6da7a7 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -1512,7 +1512,7 @@ if args.travis: _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'} if 'all' in args.language: - lang_list = _LANGUAGES.keys() + lang_list = list(_LANGUAGES.keys()) else: lang_list = args.language # We don't support code coverage on some languages @@ -1719,9 +1719,9 @@ def _has_epollexclusive(): try: subprocess.check_call(binary) return True - except subprocess.CalledProcessError, e: + except subprocess.CalledProcessError as e: return False - except OSError, e: + except OSError as e: # For languages other than C and Windows the binary won't exist return False |