aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-09-12 09:03:49 -0700
committerGravatar GitHub <noreply@github.com>2018-09-12 09:03:49 -0700
commit3bc10c0f44c64bb35c0a1ae94e7ea2d14c72e5c1 (patch)
treeb71ed5b17147d2770f1d30f985bfb1bf38776245
parent584dd0564605091a6299a624e85f80bfc2a57acf (diff)
parent0382d062486b5ba384d1288008147b6d36868485 (diff)
Merge pull request #16492 from vjpai/client_callback
EXPERIMENTAL: C++ generic client-side unary callback API
-rw-r--r--BUILD4
-rw-r--r--CMakeLists.txt57
-rw-r--r--Makefile64
-rw-r--r--build.yaml18
-rw-r--r--gRPC-C++.podspec4
-rw-r--r--grpc.gyp2
-rw-r--r--include/grpcpp/channel.h11
-rw-r--r--include/grpcpp/generic/generic_stub.h23
-rw-r--r--include/grpcpp/impl/codegen/call.h16
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h103
-rw-r--r--include/grpcpp/impl/codegen/channel_interface.h14
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h95
-rw-r--r--include/grpcpp/impl/codegen/client_context.h4
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h3
-rw-r--r--include/grpcpp/impl/codegen/completion_queue_tag.h23
-rw-r--r--include/grpcpp/support/client_callback.h24
-rw-r--r--src/core/lib/surface/completion_queue.cc4
-rw-r--r--src/cpp/client/channel_cc.cc48
-rw-r--r--src/cpp/client/generic_stub.cc14
-rw-r--r--src/cpp/common/callback_common.cc131
-rw-r--r--src/cpp/server/server_cc.cc1
-rw-r--r--src/cpp/server/server_context.cc3
-rw-r--r--test/cpp/end2end/BUILD19
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc126
-rw-r--r--tools/doxygen/Doxyfile.c++3
-rw-r--r--tools/doxygen/Doxyfile.c++.internal4
-rw-r--r--tools/run_tests/generated/sources_and_headers.json26
-rw-r--r--tools/run_tests/generated/tests.json24
28 files changed, 858 insertions, 10 deletions
diff --git a/BUILD b/BUILD
index 925e277cf7..271e57e36c 100644
--- a/BUILD
+++ b/BUILD
@@ -119,6 +119,7 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
+ "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc",
@@ -243,6 +244,7 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -1979,7 +1981,9 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4cbc7597ab..c358e9bd43 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -565,6 +565,7 @@ add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
add_dependencies(buildtests_cxx check_gcp_environment_windows_test)
add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
add_dependencies(buildtests_cxx cli_call_test)
+add_dependencies(buildtests_cxx client_callback_end2end_test)
add_dependencies(buildtests_cxx client_channel_stress_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_crash_test)
@@ -2772,6 +2773,7 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -2918,6 +2920,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -3015,7 +3018,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -3130,6 +3135,7 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -3487,6 +3493,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -3584,7 +3591,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -3994,7 +4003,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -4172,7 +4183,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -4248,6 +4261,7 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -4393,6 +4407,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -4490,7 +4505,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -11327,6 +11344,46 @@ target_link_libraries(cli_call_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(client_callback_end2end_test
+ test/cpp/end2end/client_callback_end2end_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(client_callback_end2end_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_callback_end2end_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc++_test_util
+ grpc_test_util
+ grpc++
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(client_channel_stress_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
diff --git a/Makefile b/Makefile
index 85de7b67c5..2f2537228c 100644
--- a/Makefile
+++ b/Makefile
@@ -1161,6 +1161,7 @@ check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linu
check_gcp_environment_windows_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test
chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
+client_callback_end2end_test: $(BINDIR)/$(CONFIG)/client_callback_end2end_test
client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server
@@ -1667,6 +1668,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
+ $(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@@ -1847,6 +1849,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
+ $(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@@ -2306,6 +2309,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
+ $(E) "[RUN] Testing client_callback_end2end_test"
+ $(Q) $(BINDIR)/$(CONFIG)/client_callback_end2end_test || ( echo test client_callback_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing client_channel_stress_test"
$(Q) $(BINDIR)/$(CONFIG)/client_channel_stress_test || ( echo test client_channel_stress_test failed ; exit 1 )
$(E) "[RUN] Testing client_crash_test"
@@ -5223,6 +5228,7 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5333,6 +5339,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -5430,7 +5437,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -5589,6 +5598,7 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5909,6 +5919,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -6006,7 +6017,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6396,7 +6409,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6550,7 +6565,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6665,6 +6682,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -6775,6 +6793,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -6872,7 +6891,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -17112,6 +17133,49 @@ endif
endif
+CLIENT_CALLBACK_END2END_TEST_SRC = \
+ test/cpp/end2end/client_callback_end2end_test.cc \
+
+CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: $(PROTOBUF_DEP) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_callback_end2end_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CLIENT_CHANNEL_STRESS_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \
test/cpp/client/client_channel_stress_test.cc \
diff --git a/build.yaml b/build.yaml
index b354826128..d6e67aa7ee 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1169,7 +1169,9 @@ filegroups:
- include/grpcpp/impl/codegen/byte_buffer.h
- include/grpcpp/impl/codegen/call.h
- include/grpcpp/impl/codegen/call_hook.h
+ - include/grpcpp/impl/codegen/callback_common.h
- include/grpcpp/impl/codegen/channel_interface.h
+ - include/grpcpp/impl/codegen/client_callback.h
- include/grpcpp/impl/codegen/client_context.h
- include/grpcpp/impl/codegen/client_unary_call.h
- include/grpcpp/impl/codegen/completion_queue.h
@@ -1297,6 +1299,7 @@ filegroups:
- include/grpcpp/support/async_unary_call.h
- include/grpcpp/support/byte_buffer.h
- include/grpcpp/support/channel_arguments.h
+ - include/grpcpp/support/client_callback.h
- include/grpcpp/support/config.h
- include/grpcpp/support/proto_buffer_reader.h
- include/grpcpp/support/proto_buffer_writer.h
@@ -1324,6 +1327,7 @@ filegroups:
- src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc
+ - src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
@@ -4479,6 +4483,20 @@ targets:
- grpc
- gpr_test_util
- gpr
+- name: client_callback_end2end_test
+ gtest: true
+ cpu_cost: 0.5
+ build: test
+ language: c++
+ src:
+ - test/cpp/end2end/client_callback_end2end_test.cc
+ deps:
+ - grpc++_test_util
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr_test_util
+ - gpr
- name: client_channel_stress_test
gtest: false
build: test
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 581b9246bc..03ec223279 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -111,6 +111,7 @@ Pod::Spec.new do |s|
'include/grpcpp/support/async_unary_call.h',
'include/grpcpp/support/byte_buffer.h',
'include/grpcpp/support/channel_arguments.h',
+ 'include/grpcpp/support/client_callback.h',
'include/grpcpp/support/config.h',
'include/grpcpp/support/proto_buffer_reader.h',
'include/grpcpp/support/proto_buffer_writer.h',
@@ -127,7 +128,9 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/byte_buffer.h',
'include/grpcpp/impl/codegen/call.h',
'include/grpcpp/impl/codegen/call_hook.h',
+ 'include/grpcpp/impl/codegen/callback_common.h',
'include/grpcpp/impl/codegen/channel_interface.h',
+ 'include/grpcpp/impl/codegen/client_callback.h',
'include/grpcpp/impl/codegen/client_context.h',
'include/grpcpp/impl/codegen/client_unary_call.h',
'include/grpcpp/impl/codegen/completion_queue.h',
@@ -187,6 +190,7 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/grpc.gyp b/grpc.gyp
index 654a531092..b8aae44de3 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1381,6 +1381,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
@@ -1528,6 +1529,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h
index fed02bf7bc..f1dba5b8ad 100644
--- a/include/grpcpp/channel.h
+++ b/include/grpcpp/channel.h
@@ -78,8 +78,19 @@ class Channel final : public ChannelInterface,
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
+ CompletionQueue* CallbackCQ() override;
+
const grpc::string host_;
grpc_channel* const c_channel_; // owned
+
+ // mu_ protects callback_cq_ (the per-channel callbackable completion queue)
+ std::mutex mu_;
+
+ // callback_cq_ references the callbackable completion queue associated
+ // with this channel (if any). It is set on the first call to CallbackCQ().
+ // It is _not owned_ by the channel; ownership belongs with its internal
+ // shutdown callback tag (invoked when the CQ is fully shutdown).
+ CompletionQueue* callback_cq_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h
index 92405a43fa..d509d9a520 100644
--- a/include/grpcpp/generic/generic_stub.h
+++ b/include/grpcpp/generic/generic_stub.h
@@ -19,9 +19,12 @@
#ifndef GRPCPP_GENERIC_GENERIC_STUB_H
#define GRPCPP_GENERIC_GENERIC_STUB_H
+#include <functional>
+
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
+#include <grpcpp/support/status.h>
namespace grpc {
@@ -62,6 +65,26 @@ class GenericStub final {
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag);
+ /// NOTE: class experimental_type is not part of the public API of this class
+ /// TODO(vjpai): Move these contents to the public API of GenericStub when
+ /// they are no longer experimental
+ class experimental_type {
+ public:
+ explicit experimental_type(GenericStub* stub) : stub_(stub) {}
+
+ void UnaryCall(ClientContext* context, const grpc::string& method,
+ const ByteBuffer* request, ByteBuffer* response,
+ std::function<void(Status)> on_completion);
+
+ private:
+ GenericStub* stub_;
+ };
+
+ /// NOTE: The function experimental() is not stable public API. It is a view
+ /// to the experimental components of this class. It may be changed or removed
+ /// at any time.
+ experimental_type experimental() { return experimental_type(this); }
+
private:
std::shared_ptr<ChannelInterface> channel_;
};
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h
index e94adada94..7cadea0055 100644
--- a/include/grpcpp/impl/codegen/call.h
+++ b/include/grpcpp/impl/codegen/call.h
@@ -599,6 +599,11 @@ class CallOpSetInterface : public CompletionQueueTag {
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
+
+ /// Get the tag to be used at the core completion queue. Generally, the
+ /// value of cq_tag will be "this". However, it can be overridden if we
+ /// want core to process the tag differently (e.g., as a core callback)
+ virtual void* cq_tag() = 0;
};
/// Primary implementation of CallOpSetInterface.
@@ -618,7 +623,7 @@ class CallOpSet : public CallOpSetInterface,
public Op5,
public Op6 {
public:
- CallOpSet() : return_tag_(this), call_(nullptr) {}
+ CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
@@ -645,7 +650,16 @@ class CallOpSet : public CallOpSetInterface,
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
+ void* cq_tag() override { return cq_tag_; }
+
+ /// set_cq_tag is used to provide a different core CQ tag than "this".
+ /// This is used for callback-based tags, where the core tag is the core
+ /// callback function. It does not change the use or behavior of any other
+ /// function (such as FinalizeResult)
+ void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
+
private:
+ void* cq_tag_;
void* return_tag_;
grpc_call* call_;
};
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
new file mode 100644
index 0000000000..68c318d2b4
--- /dev/null
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
+#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/channel_interface.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/core_codegen_interface.h>
+#include <grpcpp/impl/codegen/status.h>
+
+// Forward declarations
+namespace grpc_core {
+class CQCallbackInterface;
+};
+
+namespace grpc {
+namespace internal {
+
+class CallbackWithStatusTag {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CallbackWithStatusTag));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
+ CompletionQueueTag* ops);
+ ~CallbackWithStatusTag() {}
+ void* tag() { return static_cast<void*>(impl_); }
+ Status* status_ptr() { return status_; }
+ CompletionQueueTag* ops() { return ops_; }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(Status s);
+
+ private:
+ grpc_core::CQCallbackInterface* impl_;
+ Status* status_;
+ CompletionQueueTag* ops_;
+};
+
+class CallbackWithSuccessTag {
+ public:
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CallbackWithSuccessTag));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
+ CompletionQueueTag* ops);
+
+ void* tag() { return static_cast<void*>(impl_); }
+ CompletionQueueTag* ops() { return ops_; }
+
+ // force_run can not be performed on a tag if operations using this tag
+ // have been sent to PerformOpsOnCall. It is intended for error conditions
+ // that are detected before the operations are internally processed.
+ void force_run(bool ok);
+
+ private:
+ grpc_core::CQCallbackInterface* impl_;
+ CompletionQueueTag* ops_;
+};
+
+} // namespace internal
+} // namespace grpc
+
+#endif // GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h
index ec1c6c25d8..b257acc1ab 100644
--- a/include/grpcpp/impl/codegen/channel_interface.h
+++ b/include/grpcpp/impl/codegen/channel_interface.h
@@ -41,6 +41,8 @@ class CallOpSetInterface;
class RpcMethod;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl;
template <class R>
class ClientAsyncReaderFactory;
template <class W>
@@ -103,6 +105,8 @@ class ChannelInterface {
friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ template <class InputMessage, class OutputMessage>
+ friend class ::grpc::internal::CallbackUnaryCallImpl;
friend class ::grpc::internal::RpcMethod;
virtual internal::Call CreateCall(const internal::RpcMethod& method,
ClientContext* context,
@@ -115,6 +119,16 @@ class ChannelInterface {
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
+
+ // EXPERIMENTAL
+ // A method to get the callbackable completion queue associated with this
+ // channel. If the return value is nullptr, this channel doesn't support
+ // callback operations.
+ // TODO(vjpai): Consider a better default like using a global CQ
+ // Returns nullptr (rather than being pure) since this is a new method
+ // and adding a new pure method to an interface would be a breaking change
+ // (even though this is private and non-API)
+ virtual CompletionQueue* CallbackCQ() { return nullptr; }
};
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
new file mode 100644
index 0000000000..fc81c8aa0a
--- /dev/null
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
+#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/callback_common.h>
+#include <grpcpp/impl/codegen/channel_interface.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/core_codegen_interface.h>
+#include <grpcpp/impl/codegen/status.h>
+
+namespace grpc {
+
+class Channel;
+class ClientContext;
+class CompletionQueue;
+
+namespace internal {
+class RpcMethod;
+
+/// Perform a callback-based unary call
+/// TODO(vjpai): Combine as much as possible with the blocking unary call code
+template <class InputMessage, class OutputMessage>
+void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage* request,
+ OutputMessage* result,
+ std::function<void(Status)> on_completion) {
+ CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
+ channel, method, context, request, result, on_completion);
+}
+
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl {
+ public:
+ CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage* request,
+ OutputMessage* result,
+ std::function<void(Status)> on_completion) {
+ CompletionQueue* cq = channel->CallbackCQ();
+ GPR_CODEGEN_ASSERT(cq != nullptr);
+ Call call(channel->CreateCall(method, context, cq));
+
+ using FullCallOpSet =
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus>;
+
+ auto* ops = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(FullCallOpSet))) FullCallOpSet;
+
+ auto* tag = new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(CallbackWithStatusTag)))
+ CallbackWithStatusTag(call.call(), on_completion, ops);
+
+ // TODO(vjpai): Unify code with sync API as much as possible
+ Status s = ops->SendMessage(*request);
+ if (!s.ok()) {
+ tag->force_run(s);
+ return;
+ }
+ ops->SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ ops->RecvInitialMetadata(context);
+ ops->RecvMessage(result);
+ ops->AllowNoMessage();
+ ops->ClientSendClose();
+ ops->ClientRecvStatus(context, tag->status_ptr());
+ ops->set_cq_tag(tag->tag());
+ call.PerformOps(ops);
+ }
+};
+
+} // namespace internal
+} // namespace grpc
+
+#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index c6c9540950..46635a541a 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -68,6 +68,8 @@ class CallOpClientRecvStatus;
class CallOpRecvInitialMetadata;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl;
} // namespace internal
template <class R>
@@ -389,6 +391,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ template <class InputMessage, class OutputMessage>
+ friend class ::grpc::internal::CallbackUnaryCallImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 3f7d4fb765..f52f9a53be 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -274,6 +274,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ // Friends that need access to constructor for callback CQ
+ friend class ::grpc::Channel;
+
/// EXPERIMENTAL
/// Creates a Thread Local cache to store the first event
/// On this completion queue queued from this thread. Once
diff --git a/include/grpcpp/impl/codegen/completion_queue_tag.h b/include/grpcpp/impl/codegen/completion_queue_tag.h
index ffb642c56b..304386a9ec 100644
--- a/include/grpcpp/impl/codegen/completion_queue_tag.h
+++ b/include/grpcpp/impl/codegen/completion_queue_tag.h
@@ -26,10 +26,25 @@ namespace internal {
class CompletionQueueTag {
public:
virtual ~CompletionQueueTag() {}
- /// Called prior to returning from Next(), return value is the status of the
- /// operation (return status is the default thing to do). If this function
- /// returns false, the tag is dropped and not returned from the completion
- /// queue
+
+ /// FinalizeResult must be called before informing user code that the
+ /// operation bound to the underlying core completion queue tag has
+ /// completed. In practice, this means:
+ ///
+ /// 1. For the sync API - before returning from Pluck
+ /// 2. For the CQ-based async API - before returning from Next
+ /// 3. For the callback-based API - before invoking the user callback
+ ///
+ /// This is the method that translates from core-side tag/status to
+ /// C++ API-observable tag/status.
+ ///
+ /// The return value is the status of the operation (returning status is the
+ /// general behavior of this function). If this function returns false, the
+ /// tag is dropped and not returned from the completion queue: this concept is
+ /// for events that are observed at core but not requested by the user
+ /// application (e.g., server shutdown, for server unimplemented method
+ /// responses, or for cases where a server-side RPC doesn't have a completion
+ /// notification registered using AsyncNotifyWhenDone)
virtual bool FinalizeResult(void** tag, bool* status) = 0;
};
} // namespace internal
diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h
new file mode 100644
index 0000000000..063fdc4f85
--- /dev/null
+++ b/include/grpcpp/support/client_callback.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
+#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
+
+#include <grpcpp/impl/codegen/client_callback.h>
+
+#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 0769d9e4f6..c2cf450e94 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
+ } else {
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 39b891c2e1..ad71286e05 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -42,8 +42,10 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@@ -53,7 +55,12 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
g_gli_initializer.summon();
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ grpc_channel_destroy(c_channel_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+}
namespace {
@@ -135,8 +142,8 @@ void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops,
+ ops->cq_tag(), nullptr));
}
void* Channel::RegisterMethod(const char* method) {
@@ -185,4 +192,39 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
return ok;
}
+namespace {
+class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ public:
+ // TakeCQ takes ownership of the cq into the shutdown callback
+ // so that the shutdown callback will be responsible for destroying it
+ void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
+
+ // The Run function will get invoked by the completion queue library
+ // when the shutdown is actually complete
+ void Run(bool) override {
+ delete cq_;
+ grpc_core::Delete(this);
+ }
+
+ private:
+ CompletionQueue* cq_ = nullptr;
+};
+} // namespace
+
+CompletionQueue* Channel::CallbackCQ() {
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-channel CQ registered
+ std::lock_guard<std::mutex> l(mu_);
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
+ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+}
+
} // namespace grpc
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 67ef46bebe..87902b26f0 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -16,9 +16,11 @@
*
*/
-#include <grpcpp/generic/generic_stub.h>
+#include <functional>
+#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/impl/rpc_method.h>
+#include <grpcpp/support/client_callback.h>
namespace grpc {
@@ -60,4 +62,14 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
context, request, false));
}
+void GenericStub::experimental_type::UnaryCall(
+ ClientContext* context, const grpc::string& method,
+ const ByteBuffer* request, ByteBuffer* response,
+ std::function<void(Status)> on_completion) {
+ internal::CallbackUnaryCall(
+ stub_->channel_.get(),
+ internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC),
+ context, request, response, std::move(on_completion));
+}
+
} // namespace grpc
diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc
new file mode 100644
index 0000000000..ae47901f1b
--- /dev/null
+++ b/src/cpp/common/callback_common.cc
@@ -0,0 +1,131 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/callback_common.h>
+#include <grpcpp/impl/codegen/status.h>
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+namespace grpc {
+namespace internal {
+
+namespace {
+class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
+ public:
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CallbackWithSuccessImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
+ std::function<void(bool)> f)
+ : call_(call), parent_(parent), func_(std::move(f)) {
+ grpc_call_ref(call);
+ }
+
+ void Run(bool ok) override {
+ void* ignored = parent_->ops();
+ bool new_ok = ok;
+ GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == parent_->ops());
+ func_(ok);
+ func_ = nullptr; // release the function
+ grpc_call_unref(call_);
+ }
+
+ private:
+ grpc_call* call_;
+ CallbackWithSuccessTag* parent_;
+ std::function<void(bool)> func_;
+};
+
+class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
+ public:
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CallbackWithStatusImpl));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
+ CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
+ std::function<void(Status)> f)
+ : call_(call), parent_(parent), func_(std::move(f)), status_() {
+ grpc_call_ref(call);
+ }
+
+ void Run(bool ok) override {
+ void* ignored = parent_->ops();
+
+ GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
+ GPR_ASSERT(ignored == parent_->ops());
+
+ func_(status_);
+ func_ = nullptr; // release the function
+ grpc_call_unref(call_);
+ }
+ Status* status_ptr() { return &status_; }
+
+ private:
+ grpc_call* call_;
+ CallbackWithStatusTag* parent_;
+ std::function<void(Status)> func_;
+ Status status_;
+};
+
+} // namespace
+
+CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
+ std::function<void(bool)> f,
+ CompletionQueueTag* ops)
+ : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
+ CallbackWithSuccessImpl(call, this, std::move(f))),
+ ops_(ops) {}
+
+void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
+
+CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
+ std::function<void(Status)> f,
+ CompletionQueueTag* ops)
+ : ops_(ops) {
+ auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
+ CallbackWithStatusImpl(call, this, std::move(f));
+ impl_ = impl;
+ status_ = impl->status_ptr();
+}
+
+void CallbackWithStatusTag::force_run(Status s) {
+ *status_ = std::move(s);
+ impl_->Run(true);
+}
+
+} // namespace internal
+} // namespace grpc
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b8ba7042d9..36f7eb81f9 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -657,6 +657,7 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
+ // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
if (result != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index bf0c027cda..b7254b6bb9 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -61,6 +61,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
+ /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
+ void* cq_tag() override { return this; }
+
void Unref();
private:
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 75dec56a60..0415efc1ef 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -98,6 +98,25 @@ grpc_cc_binary(
],
)
+grpc_cc_test(
+ name = "client_callback_end2end_test",
+ srcs = ["client_callback_end2end_test.cc"],
+ external_deps = [
+ "gtest",
+ ],
+ deps = [
+ ":test_service_impl",
+ "//:gpr",
+ "//:grpc",
+ "//:grpc++",
+ "//src/proto/grpc/testing:echo_messages_proto",
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ "//test/cpp/util:test_util",
+ ],
+)
+
grpc_cc_library(
name = "end2end_test_lib",
testonly = True,
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
new file mode 100644
index 0000000000..75b896b33d
--- /dev/null
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -0,0 +1,126 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <functional>
+#include <mutex>
+
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/generic/generic_stub.h>
+#include <grpcpp/impl/codegen/proto_utils.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+#include <grpcpp/support/client_callback.h>
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+#include "test/cpp/util/byte_buffer_proto_helper.h"
+
+#include <gtest/gtest.h>
+
+namespace grpc {
+namespace testing {
+namespace {
+
+class ClientCallbackEnd2endTest : public ::testing::Test {
+ protected:
+ ClientCallbackEnd2endTest() {}
+
+ void SetUp() override {
+ ServerBuilder builder;
+ builder.RegisterService(&service_);
+
+ server_ = builder.BuildAndStart();
+ is_server_started_ = true;
+ }
+
+ void ResetStub() {
+ ChannelArguments args;
+ channel_ = server_->InProcessChannel(args);
+ stub_.reset(new GenericStub(channel_));
+ }
+
+ void TearDown() override {
+ if (is_server_started_) {
+ server_->Shutdown();
+ }
+ }
+
+ void SendRpcs(int num_rpcs) {
+ const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ EchoRequest request;
+ std::unique_ptr<ByteBuffer> send_buf;
+ ByteBuffer recv_buf;
+ ClientContext cli_ctx;
+
+ test_string += "Hello world. ";
+ request.set_message(test_string);
+ send_buf = SerializeToByteBuffer(&request);
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ stub_->experimental().UnaryCall(
+ &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
+ [&request, &recv_buf, &done, &mu, &cv](Status s) {
+ GPR_ASSERT(s.ok());
+
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
+ EXPECT_EQ(request.message(), response.message());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+ }
+ }
+ bool is_server_started_;
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<grpc::GenericStub> stub_;
+ TestServiceImpl service_;
+ std::unique_ptr<Server> server_;
+};
+
+TEST_F(ClientCallbackEnd2endTest, SimpleRpc) {
+ ResetStub();
+ SendRpcs(1);
+}
+
+TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
+ ResetStub();
+ SendRpcs(10);
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index 9a97ee84f2..3b7fd1fa8e 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -945,7 +945,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -997,6 +999,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 0cd4cfd647..a72390d9f8 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -946,7 +946,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -999,6 +1001,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -1187,6 +1190,7 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \
+src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 5d6113bc18..f3e93a0874 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3348,6 +3348,25 @@
"grpc++_test_util",
"grpc_test_util"
],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "client_callback_end2end_test",
+ "src": [
+ "test/cpp/end2end/client_callback_end2end_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
+ "grpc++_test_util",
+ "grpc_test_util"
+ ],
"headers": [
"src/proto/grpc/lb/v1/load_balancer.grpc.pb.h",
"src/proto/grpc/lb/v1/load_balancer.pb.h",
@@ -11098,7 +11117,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@@ -11164,7 +11185,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@@ -11322,6 +11345,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -11426,6 +11450,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -11445,6 +11470,7 @@
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
+ "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 0ecc8a120a..b3c07d9215 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -4006,6 +4006,30 @@
"posix",
"windows"
],
+ "cpu_cost": 0.5,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "client_callback_end2end_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],