diff options
author | Vijay Pai <vpai@google.com> | 2018-09-12 09:03:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-12 09:03:49 -0700 |
commit | 3bc10c0f44c64bb35c0a1ae94e7ea2d14c72e5c1 (patch) | |
tree | b71ed5b17147d2770f1d30f985bfb1bf38776245 | |
parent | 584dd0564605091a6299a624e85f80bfc2a57acf (diff) | |
parent | 0382d062486b5ba384d1288008147b6d36868485 (diff) |
Merge pull request #16492 from vjpai/client_callback
EXPERIMENTAL: C++ generic client-side unary callback API
28 files changed, 858 insertions, 10 deletions
@@ -119,6 +119,7 @@ GRPCXX_SRCS = [ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", + "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/completion_queue_cc.cc", @@ -243,6 +244,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/support/async_unary_call.h", "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", + "include/grpcpp/support/client_callback.h", "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", @@ -1979,7 +1981,9 @@ grpc_cc_library( "include/grpcpp/impl/codegen/byte_buffer.h", "include/grpcpp/impl/codegen/call.h", "include/grpcpp/impl/codegen/call_hook.h", + "include/grpcpp/impl/codegen/callback_common.h", "include/grpcpp/impl/codegen/channel_interface.h", + "include/grpcpp/impl/codegen/client_callback.h", "include/grpcpp/impl/codegen/client_context.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cbc7597ab..c358e9bd43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -565,6 +565,7 @@ add_dependencies(buildtests_cxx check_gcp_environment_linux_test) add_dependencies(buildtests_cxx check_gcp_environment_windows_test) add_dependencies(buildtests_cxx chttp2_settings_timeout_test) add_dependencies(buildtests_cxx cli_call_test) +add_dependencies(buildtests_cxx client_callback_end2end_test) add_dependencies(buildtests_cxx client_channel_stress_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx client_crash_test) @@ -2772,6 +2773,7 @@ add_library(grpc++ src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc + src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -2918,6 +2920,7 @@ foreach(_hdr include/grpcpp/support/async_unary_call.h include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h + include/grpcpp/support/client_callback.h include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h @@ -3015,7 +3018,9 @@ foreach(_hdr include/grpcpp/impl/codegen/byte_buffer.h include/grpcpp/impl/codegen/call.h include/grpcpp/impl/codegen/call_hook.h + include/grpcpp/impl/codegen/callback_common.h include/grpcpp/impl/codegen/channel_interface.h + include/grpcpp/impl/codegen/client_callback.h include/grpcpp/impl/codegen/client_context.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h @@ -3130,6 +3135,7 @@ add_library(grpc++_cronet src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc + src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -3487,6 +3493,7 @@ foreach(_hdr include/grpcpp/support/async_unary_call.h include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h + include/grpcpp/support/client_callback.h include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h @@ -3584,7 +3591,9 @@ foreach(_hdr include/grpcpp/impl/codegen/byte_buffer.h include/grpcpp/impl/codegen/call.h include/grpcpp/impl/codegen/call_hook.h + include/grpcpp/impl/codegen/callback_common.h include/grpcpp/impl/codegen/channel_interface.h + include/grpcpp/impl/codegen/client_callback.h include/grpcpp/impl/codegen/client_context.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h @@ -3994,7 +4003,9 @@ foreach(_hdr include/grpcpp/impl/codegen/byte_buffer.h include/grpcpp/impl/codegen/call.h include/grpcpp/impl/codegen/call_hook.h + include/grpcpp/impl/codegen/callback_common.h include/grpcpp/impl/codegen/channel_interface.h + include/grpcpp/impl/codegen/client_callback.h include/grpcpp/impl/codegen/client_context.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h @@ -4172,7 +4183,9 @@ foreach(_hdr include/grpcpp/impl/codegen/byte_buffer.h include/grpcpp/impl/codegen/call.h include/grpcpp/impl/codegen/call_hook.h + include/grpcpp/impl/codegen/callback_common.h include/grpcpp/impl/codegen/channel_interface.h + include/grpcpp/impl/codegen/client_callback.h include/grpcpp/impl/codegen/client_context.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h @@ -4248,6 +4261,7 @@ add_library(grpc++_unsecure src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc + src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -4393,6 +4407,7 @@ foreach(_hdr include/grpcpp/support/async_unary_call.h include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h + include/grpcpp/support/client_callback.h include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h @@ -4490,7 +4505,9 @@ foreach(_hdr include/grpcpp/impl/codegen/byte_buffer.h include/grpcpp/impl/codegen/call.h include/grpcpp/impl/codegen/call_hook.h + include/grpcpp/impl/codegen/callback_common.h include/grpcpp/impl/codegen/channel_interface.h + include/grpcpp/impl/codegen/client_callback.h include/grpcpp/impl/codegen/client_context.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h @@ -11327,6 +11344,46 @@ target_link_libraries(cli_call_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(client_callback_end2end_test + test/cpp/end2end/client_callback_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(client_callback_end2end_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(client_callback_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(client_channel_stress_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc @@ -1161,6 +1161,7 @@ check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linu check_gcp_environment_windows_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test +client_callback_end2end_test: $(BINDIR)/$(CONFIG)/client_callback_end2end_test client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server @@ -1667,6 +1668,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \ $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \ $(BINDIR)/$(CONFIG)/cli_call_test \ + $(BINDIR)/$(CONFIG)/client_callback_end2end_test \ $(BINDIR)/$(CONFIG)/client_channel_stress_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ $(BINDIR)/$(CONFIG)/client_crash_test_server \ @@ -1847,6 +1849,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \ $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \ $(BINDIR)/$(CONFIG)/cli_call_test \ + $(BINDIR)/$(CONFIG)/client_callback_end2end_test \ $(BINDIR)/$(CONFIG)/client_channel_stress_test \ $(BINDIR)/$(CONFIG)/client_crash_test \ $(BINDIR)/$(CONFIG)/client_crash_test_server \ @@ -2306,6 +2309,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 ) $(E) "[RUN] Testing cli_call_test" $(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 ) + $(E) "[RUN] Testing client_callback_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/client_callback_end2end_test || ( echo test client_callback_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing client_channel_stress_test" $(Q) $(BINDIR)/$(CONFIG)/client_channel_stress_test || ( echo test client_channel_stress_test failed ; exit 1 ) $(E) "[RUN] Testing client_crash_test" @@ -5223,6 +5228,7 @@ LIBGRPC++_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ + src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -5333,6 +5339,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ + include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ @@ -5430,7 +5437,9 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ + include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ + include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -5589,6 +5598,7 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ + src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -5909,6 +5919,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ + include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ @@ -6006,7 +6017,9 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ + include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ + include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -6396,7 +6409,9 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ + include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ + include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -6550,7 +6565,9 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ + include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ + include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -6665,6 +6682,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ + src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -6775,6 +6793,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ + include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ @@ -6872,7 +6891,9 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ + include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ + include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -17112,6 +17133,49 @@ endif endif +CLIENT_CALLBACK_END2END_TEST_SRC = \ + test/cpp/end2end/client_callback_end2end_test.cc \ + +CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/client_callback_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/client_callback_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/client_callback_end2end_test: $(PROTOBUF_DEP) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_callback_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep) +endif +endif + + CLIENT_CHANNEL_STRESS_TEST_SRC = \ $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \ test/cpp/client/client_channel_stress_test.cc \ diff --git a/build.yaml b/build.yaml index b354826128..d6e67aa7ee 100644 --- a/build.yaml +++ b/build.yaml @@ -1169,7 +1169,9 @@ filegroups: - include/grpcpp/impl/codegen/byte_buffer.h - include/grpcpp/impl/codegen/call.h - include/grpcpp/impl/codegen/call_hook.h + - include/grpcpp/impl/codegen/callback_common.h - include/grpcpp/impl/codegen/channel_interface.h + - include/grpcpp/impl/codegen/client_callback.h - include/grpcpp/impl/codegen/client_context.h - include/grpcpp/impl/codegen/client_unary_call.h - include/grpcpp/impl/codegen/completion_queue.h @@ -1297,6 +1299,7 @@ filegroups: - include/grpcpp/support/async_unary_call.h - include/grpcpp/support/byte_buffer.h - include/grpcpp/support/channel_arguments.h + - include/grpcpp/support/client_callback.h - include/grpcpp/support/config.h - include/grpcpp/support/proto_buffer_reader.h - include/grpcpp/support/proto_buffer_writer.h @@ -1324,6 +1327,7 @@ filegroups: - src/cpp/client/credentials_cc.cc - src/cpp/client/generic_stub.cc - src/cpp/common/alarm.cc + - src/cpp/common/callback_common.cc - src/cpp/common/channel_arguments.cc - src/cpp/common/channel_filter.cc - src/cpp/common/completion_queue_cc.cc @@ -4479,6 +4483,20 @@ targets: - grpc - gpr_test_util - gpr +- name: client_callback_end2end_test + gtest: true + cpu_cost: 0.5 + build: test + language: c++ + src: + - test/cpp/end2end/client_callback_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr - name: client_channel_stress_test gtest: false build: test diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 581b9246bc..03ec223279 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -111,6 +111,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/async_unary_call.h', 'include/grpcpp/support/byte_buffer.h', 'include/grpcpp/support/channel_arguments.h', + 'include/grpcpp/support/client_callback.h', 'include/grpcpp/support/config.h', 'include/grpcpp/support/proto_buffer_reader.h', 'include/grpcpp/support/proto_buffer_writer.h', @@ -127,7 +128,9 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/byte_buffer.h', 'include/grpcpp/impl/codegen/call.h', 'include/grpcpp/impl/codegen/call_hook.h', + 'include/grpcpp/impl/codegen/callback_common.h', 'include/grpcpp/impl/codegen/channel_interface.h', + 'include/grpcpp/impl/codegen/client_callback.h', 'include/grpcpp/impl/codegen/client_context.h', 'include/grpcpp/impl/codegen/client_unary_call.h', 'include/grpcpp/impl/codegen/completion_queue.h', @@ -187,6 +190,7 @@ Pod::Spec.new do |s| 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', + 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1381,6 +1381,7 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', + 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1528,6 +1529,7 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', + 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index fed02bf7bc..f1dba5b8ad 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -78,8 +78,19 @@ class Channel final : public ChannelInterface, bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; + CompletionQueue* CallbackCQ() override; + const grpc::string host_; grpc_channel* const c_channel_; // owned + + // mu_ protects callback_cq_ (the per-channel callbackable completion queue) + std::mutex mu_; + + // callback_cq_ references the callbackable completion queue associated + // with this channel (if any). It is set on the first call to CallbackCQ(). + // It is _not owned_ by the channel; ownership belongs with its internal + // shutdown callback tag (invoked when the CQ is fully shutdown). + CompletionQueue* callback_cq_ = nullptr; }; } // namespace grpc diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h index 92405a43fa..d509d9a520 100644 --- a/include/grpcpp/generic/generic_stub.h +++ b/include/grpcpp/generic/generic_stub.h @@ -19,9 +19,12 @@ #ifndef GRPCPP_GENERIC_GENERIC_STUB_H #define GRPCPP_GENERIC_GENERIC_STUB_H +#include <functional> + #include <grpcpp/support/async_stream.h> #include <grpcpp/support/async_unary_call.h> #include <grpcpp/support/byte_buffer.h> +#include <grpcpp/support/status.h> namespace grpc { @@ -62,6 +65,26 @@ class GenericStub final { ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag); + /// NOTE: class experimental_type is not part of the public API of this class + /// TODO(vjpai): Move these contents to the public API of GenericStub when + /// they are no longer experimental + class experimental_type { + public: + explicit experimental_type(GenericStub* stub) : stub_(stub) {} + + void UnaryCall(ClientContext* context, const grpc::string& method, + const ByteBuffer* request, ByteBuffer* response, + std::function<void(Status)> on_completion); + + private: + GenericStub* stub_; + }; + + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + private: std::shared_ptr<ChannelInterface> channel_; }; diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index e94adada94..7cadea0055 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -599,6 +599,11 @@ class CallOpSetInterface : public CompletionQueueTag { /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; + + /// Get the tag to be used at the core completion queue. Generally, the + /// value of cq_tag will be "this". However, it can be overridden if we + /// want core to process the tag differently (e.g., as a core callback) + virtual void* cq_tag() = 0; }; /// Primary implementation of CallOpSetInterface. @@ -618,7 +623,7 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : return_tag_(this), call_(nullptr) {} + CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {} void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override { this->Op1::AddOp(ops, nops); this->Op2::AddOp(ops, nops); @@ -645,7 +650,16 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } + void* cq_tag() override { return cq_tag_; } + + /// set_cq_tag is used to provide a different core CQ tag than "this". + /// This is used for callback-based tags, where the core tag is the core + /// callback function. It does not change the use or behavior of any other + /// function (such as FinalizeResult) + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + private: + void* cq_tag_; void* return_tag_; grpc_call* call_; }; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h new file mode 100644 index 0000000000..68c318d2b4 --- /dev/null +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -0,0 +1,103 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H +#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +// Forward declarations +namespace grpc_core { +class CQCallbackInterface; +}; + +namespace grpc { +namespace internal { + +class CallbackWithStatusTag { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, + CompletionQueueTag* ops); + ~CallbackWithStatusTag() {} + void* tag() { return static_cast<void*>(impl_); } + Status* status_ptr() { return status_; } + CompletionQueueTag* ops() { return ops_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(Status s); + + private: + grpc_core::CQCallbackInterface* impl_; + Status* status_; + CompletionQueueTag* ops_; +}; + +class CallbackWithSuccessTag { + public: + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessTag)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, + CompletionQueueTag* ops); + + void* tag() { return static_cast<void*>(impl_); } + CompletionQueueTag* ops() { return ops_; } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok); + + private: + grpc_core::CQCallbackInterface* impl_; + CompletionQueueTag* ops_; +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index ec1c6c25d8..b257acc1ab 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -41,6 +41,8 @@ class CallOpSetInterface; class RpcMethod; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; template <class R> class ClientAsyncReaderFactory; template <class W> @@ -103,6 +105,8 @@ class ChannelInterface { friend class ::grpc::internal::ClientAsyncResponseReaderFactory; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; friend class ::grpc::internal::RpcMethod; virtual internal::Call CreateCall(const internal::RpcMethod& method, ClientContext* context, @@ -115,6 +119,16 @@ class ChannelInterface { CompletionQueue* cq, void* tag) = 0; virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; + + // EXPERIMENTAL + // A method to get the callbackable completion queue associated with this + // channel. If the return value is nullptr, this channel doesn't support + // callback operations. + // TODO(vjpai): Consider a better default like using a global CQ + // Returns nullptr (rather than being pure) since this is a new method + // and adding a new pure method to an interface would be a breaking change + // (even though this is private and non-API) + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h new file mode 100644 index 0000000000..fc81c8aa0a --- /dev/null +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H +#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +class Channel; +class ClientContext; +class CompletionQueue; + +namespace internal { +class RpcMethod; + +/// Perform a callback-based unary call +/// TODO(vjpai): Combine as much as possible with the blocking unary call code +template <class InputMessage, class OutputMessage> +void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CallbackUnaryCallImpl<InputMessage, OutputMessage> x( + channel, method, context, request, result, on_completion); +} + +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl { + public: + CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage* request, + OutputMessage* result, + std::function<void(Status)> on_completion) { + CompletionQueue* cq = channel->CallbackCQ(); + GPR_CODEGEN_ASSERT(cq != nullptr); + Call call(channel->CreateCall(method, context, cq)); + + using FullCallOpSet = + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus>; + + auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(FullCallOpSet))) FullCallOpSet; + + auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(CallbackWithStatusTag))) + CallbackWithStatusTag(call.call(), on_completion, ops); + + // TODO(vjpai): Unify code with sync API as much as possible + Status s = ops->SendMessage(*request); + if (!s.ok()) { + tag->force_run(s); + return; + } + ops->SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + ops->RecvInitialMetadata(context); + ops->RecvMessage(result); + ops->AllowNoMessage(); + ops->ClientSendClose(); + ops->ClientRecvStatus(context, tag->status_ptr()); + ops->set_cq_tag(tag->tag()); + call.PerformOps(ops); + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index c6c9540950..46635a541a 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -68,6 +68,8 @@ class CallOpClientRecvStatus; class CallOpRecvInitialMetadata; template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl; +template <class InputMessage, class OutputMessage> +class CallbackUnaryCallImpl; } // namespace internal template <class R> @@ -389,6 +391,8 @@ class ClientContext { friend class ::grpc::ClientAsyncResponseReader; template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + template <class InputMessage, class OutputMessage> + friend class ::grpc::internal::CallbackUnaryCallImpl; // Used by friend class CallOpClientRecvStatus void set_debug_error_string(const grpc::string& debug_error_string) { diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 3f7d4fb765..f52f9a53be 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -274,6 +274,9 @@ class CompletionQueue : private GrpcLibraryCodegen { template <class InputMessage, class OutputMessage> friend class ::grpc::internal::BlockingUnaryCallImpl; + // Friends that need access to constructor for callback CQ + friend class ::grpc::Channel; + /// EXPERIMENTAL /// Creates a Thread Local cache to store the first event /// On this completion queue queued from this thread. Once diff --git a/include/grpcpp/impl/codegen/completion_queue_tag.h b/include/grpcpp/impl/codegen/completion_queue_tag.h index ffb642c56b..304386a9ec 100644 --- a/include/grpcpp/impl/codegen/completion_queue_tag.h +++ b/include/grpcpp/impl/codegen/completion_queue_tag.h @@ -26,10 +26,25 @@ namespace internal { class CompletionQueueTag { public: virtual ~CompletionQueueTag() {} - /// Called prior to returning from Next(), return value is the status of the - /// operation (return status is the default thing to do). If this function - /// returns false, the tag is dropped and not returned from the completion - /// queue + + /// FinalizeResult must be called before informing user code that the + /// operation bound to the underlying core completion queue tag has + /// completed. In practice, this means: + /// + /// 1. For the sync API - before returning from Pluck + /// 2. For the CQ-based async API - before returning from Next + /// 3. For the callback-based API - before invoking the user callback + /// + /// This is the method that translates from core-side tag/status to + /// C++ API-observable tag/status. + /// + /// The return value is the status of the operation (returning status is the + /// general behavior of this function). If this function returns false, the + /// tag is dropped and not returned from the completion queue: this concept is + /// for events that are observed at core but not requested by the user + /// application (e.g., server shutdown, for server unimplemented method + /// responses, or for cases where a server-side RPC doesn't have a completion + /// notification registered using AsyncNotifyWhenDone) virtual bool FinalizeResult(void** tag, bool* status) = 0; }; } // namespace internal diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h new file mode 100644 index 0000000000..063fdc4f85 --- /dev/null +++ b/include/grpcpp/support/client_callback.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H +#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H + +#include <grpcpp/impl/codegen/client_callback.h> + +#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0769d9e4f6..c2cf450e94 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); + } else { + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 39b891c2e1..ad71286e05 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -42,8 +42,10 @@ #include <grpcpp/support/time.h> #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/completion_queue.h" namespace grpc { @@ -53,7 +55,12 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel) g_gli_initializer.summon(); } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + grpc_channel_destroy(c_channel_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } +} namespace { @@ -135,8 +142,8 @@ void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, + ops->cq_tag(), nullptr)); } void* Channel::RegisterMethod(const char* method) { @@ -185,4 +192,39 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, return ok; } +namespace { +class ShutdownCallback : public grpc_core::CQCallbackInterface { + public: + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + void Run(bool) override { + delete cq_; + grpc_core::Delete(this); + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Channel::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-channel CQ registered + std::lock_guard<std::mutex> l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +} + } // namespace grpc diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 67ef46bebe..87902b26f0 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -16,9 +16,11 @@ * */ -#include <grpcpp/generic/generic_stub.h> +#include <functional> +#include <grpcpp/generic/generic_stub.h> #include <grpcpp/impl/rpc_method.h> +#include <grpcpp/support/client_callback.h> namespace grpc { @@ -60,4 +62,14 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall( context, request, false)); } +void GenericStub::experimental_type::UnaryCall( + ClientContext* context, const grpc::string& method, + const ByteBuffer* request, ByteBuffer* response, + std::function<void(Status)> on_completion) { + internal::CallbackUnaryCall( + stub_->channel_.get(), + internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC), + context, request, response, std::move(on_completion)); +} + } // namespace grpc diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc new file mode 100644 index 0000000000..ae47901f1b --- /dev/null +++ b/src/cpp/common/callback_common.cc @@ -0,0 +1,131 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <functional> + +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/status.h> + +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/surface/completion_queue.h" + +namespace grpc { +namespace internal { + +namespace { +class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { + public: + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithSuccessImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, + std::function<void(bool)> f) + : call_(call), parent_(parent), func_(std::move(f)) { + grpc_call_ref(call); + } + + void Run(bool ok) override { + void* ignored = parent_->ops(); + bool new_ok = ok; + GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == parent_->ops()); + func_(ok); + func_ = nullptr; // release the function + grpc_call_unref(call_); + } + + private: + grpc_call* call_; + CallbackWithSuccessTag* parent_; + std::function<void(bool)> func_; +}; + +class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { + public: + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CallbackWithStatusImpl)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + + CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, + std::function<void(Status)> f) + : call_(call), parent_(parent), func_(std::move(f)), status_() { + grpc_call_ref(call); + } + + void Run(bool ok) override { + void* ignored = parent_->ops(); + + GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); + GPR_ASSERT(ignored == parent_->ops()); + + func_(status_); + func_ = nullptr; // release the function + grpc_call_unref(call_); + } + Status* status_ptr() { return &status_; } + + private: + grpc_call* call_; + CallbackWithStatusTag* parent_; + std::function<void(Status)> func_; + Status status_; +}; + +} // namespace + +CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, + std::function<void(bool)> f, + CompletionQueueTag* ops) + : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) + CallbackWithSuccessImpl(call, this, std::move(f))), + ops_(ops) {} + +void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } + +CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, + std::function<void(Status)> f, + CompletionQueueTag* ops) + : ops_(ops) { + auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) + CallbackWithStatusImpl(call, this, std::move(f)); + impl_ = impl; + status_ = impl->status_ptr(); +} + +void CallbackWithStatusTag::force_run(Status s) { + *status_ = std::move(s); + impl_->Run(true); +} + +} // namespace internal +} // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index b8ba7042d9..36f7eb81f9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -657,6 +657,7 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); + // TODO(vjpai): Use ops->cq_tag once this case supports callbacks auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); if (result != GRPC_CALL_OK) { gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index bf0c027cda..b7254b6bb9 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -61,6 +61,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } + /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API + void* cq_tag() override { return this; } + void Unref(); private: diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 75dec56a60..0415efc1ef 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -98,6 +98,25 @@ grpc_cc_binary( ], ) +grpc_cc_test( + name = "client_callback_end2end_test", + srcs = ["client_callback_end2end_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + grpc_cc_library( name = "end2end_test_lib", testonly = True, diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc new file mode 100644 index 0000000000..75b896b33d --- /dev/null +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -0,0 +1,126 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <functional> +#include <mutex> + +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/generic/generic_stub.h> +#include <grpcpp/impl/codegen/proto_utils.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/server_context.h> +#include <grpcpp/support/client_callback.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" +#include "test/cpp/util/byte_buffer_proto_helper.h" + +#include <gtest/gtest.h> + +namespace grpc { +namespace testing { +namespace { + +class ClientCallbackEnd2endTest : public ::testing::Test { + protected: + ClientCallbackEnd2endTest() {} + + void SetUp() override { + ServerBuilder builder; + builder.RegisterService(&service_); + + server_ = builder.BuildAndStart(); + is_server_started_ = true; + } + + void ResetStub() { + ChannelArguments args; + channel_ = server_->InProcessChannel(args); + stub_.reset(new GenericStub(channel_)); + } + + void TearDown() override { + if (is_server_started_) { + server_->Shutdown(); + } + } + + void SendRpcs(int num_rpcs) { + const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest request; + std::unique_ptr<ByteBuffer> send_buf; + ByteBuffer recv_buf; + ClientContext cli_ctx; + + test_string += "Hello world. "; + request.set_message(test_string); + send_buf = SerializeToByteBuffer(&request); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental().UnaryCall( + &cli_ctx, kMethodName, send_buf.get(), &recv_buf, + [&request, &recv_buf, &done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + EchoResponse response; + EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response)); + EXPECT_EQ(request.message(), response.message()); + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } + } + } + bool is_server_started_; + std::shared_ptr<Channel> channel_; + std::unique_ptr<grpc::GenericStub> stub_; + TestServiceImpl service_; + std::unique_ptr<Server> server_; +}; + +TEST_F(ClientCallbackEnd2endTest, SimpleRpc) { + ResetStub(); + SendRpcs(1); +} + +TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { + ResetStub(); + SendRpcs(10); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 9a97ee84f2..3b7fd1fa8e 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -945,7 +945,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ +include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ +include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -997,6 +999,7 @@ include/grpcpp/support/async_stream.h \ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ +include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 0cd4cfd647..a72390d9f8 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -946,7 +946,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \ include/grpcpp/impl/codegen/byte_buffer.h \ include/grpcpp/impl/codegen/call.h \ include/grpcpp/impl/codegen/call_hook.h \ +include/grpcpp/impl/codegen/callback_common.h \ include/grpcpp/impl/codegen/channel_interface.h \ +include/grpcpp/impl/codegen/client_callback.h \ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ @@ -999,6 +1001,7 @@ include/grpcpp/support/async_stream.h \ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ +include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ @@ -1187,6 +1190,7 @@ src/cpp/client/secure_credentials.h \ src/cpp/codegen/codegen_init.cc \ src/cpp/common/alarm.cc \ src/cpp/common/auth_property_iterator.cc \ +src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 5d6113bc18..f3e93a0874 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -3348,6 +3348,25 @@ "grpc++_test_util", "grpc_test_util" ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "client_callback_end2end_test", + "src": [ + "test/cpp/end2end/client_callback_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], "headers": [ "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h", "src/proto/grpc/lb/v1/load_balancer.pb.h", @@ -11098,7 +11117,9 @@ "include/grpcpp/impl/codegen/byte_buffer.h", "include/grpcpp/impl/codegen/call.h", "include/grpcpp/impl/codegen/call_hook.h", + "include/grpcpp/impl/codegen/callback_common.h", "include/grpcpp/impl/codegen/channel_interface.h", + "include/grpcpp/impl/codegen/client_callback.h", "include/grpcpp/impl/codegen/client_context.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", @@ -11164,7 +11185,9 @@ "include/grpcpp/impl/codegen/byte_buffer.h", "include/grpcpp/impl/codegen/call.h", "include/grpcpp/impl/codegen/call_hook.h", + "include/grpcpp/impl/codegen/callback_common.h", "include/grpcpp/impl/codegen/channel_interface.h", + "include/grpcpp/impl/codegen/client_callback.h", "include/grpcpp/impl/codegen/client_context.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", @@ -11322,6 +11345,7 @@ "include/grpcpp/support/async_unary_call.h", "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", + "include/grpcpp/support/client_callback.h", "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", @@ -11426,6 +11450,7 @@ "include/grpcpp/support/async_unary_call.h", "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", + "include/grpcpp/support/client_callback.h", "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", @@ -11445,6 +11470,7 @@ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", + "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 0ecc8a120a..b3c07d9215 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4006,6 +4006,30 @@ "posix", "windows" ], + "cpu_cost": 0.5, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "client_callback_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], |