aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD4
-rw-r--r--CMakeLists.txt57
-rw-r--r--Makefile64
-rw-r--r--build.yaml18
-rw-r--r--gRPC-C++.podspec4
-rw-r--r--grpc.gyp2
-rw-r--r--include/grpcpp/channel.h5
-rw-r--r--include/grpcpp/generic/generic_stub.h23
-rw-r--r--include/grpcpp/impl/codegen/call.h10
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h79
-rw-r--r--include/grpcpp/impl/codegen/channel_interface.h14
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h92
-rw-r--r--include/grpcpp/impl/codegen/client_context.h4
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h3
-rw-r--r--include/grpcpp/support/client_callback.h24
-rw-r--r--src/core/lib/surface/completion_queue.cc4
-rw-r--r--src/cpp/client/channel_cc.cc41
-rw-r--r--src/cpp/client/generic_stub.cc14
-rw-r--r--src/cpp/common/callback_common.cc109
-rw-r--r--src/cpp/server/server_cc.cc3
-rw-r--r--src/cpp/server/server_context.cc3
-rw-r--r--test/cpp/end2end/BUILD19
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc126
-rw-r--r--tools/doxygen/Doxyfile.c++3
-rw-r--r--tools/doxygen/Doxyfile.c++.internal4
-rw-r--r--tools/run_tests/generated/sources_and_headers.json26
-rw-r--r--tools/run_tests/generated/tests.json24
27 files changed, 772 insertions, 7 deletions
diff --git a/BUILD b/BUILD
index 925e277cf7..271e57e36c 100644
--- a/BUILD
+++ b/BUILD
@@ -119,6 +119,7 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
+ "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc",
@@ -243,6 +244,7 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -1979,7 +1981,9 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a21bb8b5fa..8679c5b4e2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -564,6 +564,7 @@ add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
add_dependencies(buildtests_cxx check_gcp_environment_windows_test)
add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
add_dependencies(buildtests_cxx cli_call_test)
+add_dependencies(buildtests_cxx client_callback_end2end_test)
add_dependencies(buildtests_cxx client_channel_stress_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_crash_test)
@@ -2771,6 +2772,7 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -2917,6 +2919,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -3014,7 +3017,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -3129,6 +3134,7 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -3486,6 +3492,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -3583,7 +3590,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -3993,7 +4002,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -4171,7 +4182,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -4247,6 +4260,7 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
+ src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -4392,6 +4406,7 @@ foreach(_hdr
include/grpcpp/support/async_unary_call.h
include/grpcpp/support/byte_buffer.h
include/grpcpp/support/channel_arguments.h
+ include/grpcpp/support/client_callback.h
include/grpcpp/support/config.h
include/grpcpp/support/proto_buffer_reader.h
include/grpcpp/support/proto_buffer_writer.h
@@ -4489,7 +4504,9 @@ foreach(_hdr
include/grpcpp/impl/codegen/byte_buffer.h
include/grpcpp/impl/codegen/call.h
include/grpcpp/impl/codegen/call_hook.h
+ include/grpcpp/impl/codegen/callback_common.h
include/grpcpp/impl/codegen/channel_interface.h
+ include/grpcpp/impl/codegen/client_callback.h
include/grpcpp/impl/codegen/client_context.h
include/grpcpp/impl/codegen/client_unary_call.h
include/grpcpp/impl/codegen/completion_queue.h
@@ -11297,6 +11314,46 @@ target_link_libraries(cli_call_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(client_callback_end2end_test
+ test/cpp/end2end/client_callback_end2end_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(client_callback_end2end_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_callback_end2end_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc++_test_util
+ grpc_test_util
+ grpc++
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(client_channel_stress_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
diff --git a/Makefile b/Makefile
index 96ea890bcb..33ec3d93e7 100644
--- a/Makefile
+++ b/Makefile
@@ -1160,6 +1160,7 @@ check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linu
check_gcp_environment_windows_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test
chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
+client_callback_end2end_test: $(BINDIR)/$(CONFIG)/client_callback_end2end_test
client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
client_crash_test_server: $(BINDIR)/$(CONFIG)/client_crash_test_server
@@ -1665,6 +1666,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
+ $(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@@ -1845,6 +1847,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
+ $(BINDIR)/$(CONFIG)/client_callback_end2end_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
$(BINDIR)/$(CONFIG)/client_crash_test_server \
@@ -2302,6 +2305,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
+ $(E) "[RUN] Testing client_callback_end2end_test"
+ $(Q) $(BINDIR)/$(CONFIG)/client_callback_end2end_test || ( echo test client_callback_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing client_channel_stress_test"
$(Q) $(BINDIR)/$(CONFIG)/client_channel_stress_test || ( echo test client_channel_stress_test failed ; exit 1 )
$(E) "[RUN] Testing client_crash_test"
@@ -5219,6 +5224,7 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5329,6 +5335,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -5426,7 +5433,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -5585,6 +5594,7 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5905,6 +5915,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -6002,7 +6013,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6392,7 +6405,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6546,7 +6561,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -6661,6 +6678,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
+ src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -6771,6 +6789,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+ include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -6868,7 +6887,9 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+ include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+ include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -17076,6 +17097,49 @@ endif
endif
+CLIENT_CALLBACK_END2END_TEST_SRC = \
+ test/cpp/end2end/client_callback_end2end_test.cc \
+
+CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/client_callback_end2end_test: $(PROTOBUF_DEP) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(CLIENT_CALLBACK_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/client_callback_end2end_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CLIENT_CHANNEL_STRESS_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \
test/cpp/client/client_channel_stress_test.cc \
diff --git a/build.yaml b/build.yaml
index a50a0a4ab6..b775773e42 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1169,7 +1169,9 @@ filegroups:
- include/grpcpp/impl/codegen/byte_buffer.h
- include/grpcpp/impl/codegen/call.h
- include/grpcpp/impl/codegen/call_hook.h
+ - include/grpcpp/impl/codegen/callback_common.h
- include/grpcpp/impl/codegen/channel_interface.h
+ - include/grpcpp/impl/codegen/client_callback.h
- include/grpcpp/impl/codegen/client_context.h
- include/grpcpp/impl/codegen/client_unary_call.h
- include/grpcpp/impl/codegen/completion_queue.h
@@ -1297,6 +1299,7 @@ filegroups:
- include/grpcpp/support/async_unary_call.h
- include/grpcpp/support/byte_buffer.h
- include/grpcpp/support/channel_arguments.h
+ - include/grpcpp/support/client_callback.h
- include/grpcpp/support/config.h
- include/grpcpp/support/proto_buffer_reader.h
- include/grpcpp/support/proto_buffer_writer.h
@@ -1324,6 +1327,7 @@ filegroups:
- src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc
+ - src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
@@ -4465,6 +4469,20 @@ targets:
- grpc
- gpr_test_util
- gpr
+- name: client_callback_end2end_test
+ gtest: true
+ cpu_cost: 0.5
+ build: test
+ language: c++
+ src:
+ - test/cpp/end2end/client_callback_end2end_test.cc
+ deps:
+ - grpc++_test_util
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr_test_util
+ - gpr
- name: client_channel_stress_test
gtest: false
build: test
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 581b9246bc..03ec223279 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -111,6 +111,7 @@ Pod::Spec.new do |s|
'include/grpcpp/support/async_unary_call.h',
'include/grpcpp/support/byte_buffer.h',
'include/grpcpp/support/channel_arguments.h',
+ 'include/grpcpp/support/client_callback.h',
'include/grpcpp/support/config.h',
'include/grpcpp/support/proto_buffer_reader.h',
'include/grpcpp/support/proto_buffer_writer.h',
@@ -127,7 +128,9 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/byte_buffer.h',
'include/grpcpp/impl/codegen/call.h',
'include/grpcpp/impl/codegen/call_hook.h',
+ 'include/grpcpp/impl/codegen/callback_common.h',
'include/grpcpp/impl/codegen/channel_interface.h',
+ 'include/grpcpp/impl/codegen/client_callback.h',
'include/grpcpp/impl/codegen/client_context.h',
'include/grpcpp/impl/codegen/client_unary_call.h',
'include/grpcpp/impl/codegen/completion_queue.h',
@@ -187,6 +190,7 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/grpc.gyp b/grpc.gyp
index 654a531092..b8aae44de3 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1381,6 +1381,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
@@ -1528,6 +1529,7 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
+ 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h
index fed02bf7bc..58a6f51664 100644
--- a/include/grpcpp/channel.h
+++ b/include/grpcpp/channel.h
@@ -78,8 +78,13 @@ class Channel final : public ChannelInterface,
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;
+ CompletionQueue* CallbackCQ() override;
+
const grpc::string host_;
grpc_channel* const c_channel_; // owned
+
+ CompletionQueue* callback_cq_ = nullptr;
+ std::mutex mu_;
};
} // namespace grpc
diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h
index 92405a43fa..d509d9a520 100644
--- a/include/grpcpp/generic/generic_stub.h
+++ b/include/grpcpp/generic/generic_stub.h
@@ -19,9 +19,12 @@
#ifndef GRPCPP_GENERIC_GENERIC_STUB_H
#define GRPCPP_GENERIC_GENERIC_STUB_H
+#include <functional>
+
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
+#include <grpcpp/support/status.h>
namespace grpc {
@@ -62,6 +65,26 @@ class GenericStub final {
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag);
+ /// NOTE: class experimental_type is not part of the public API of this class
+ /// TODO(vjpai): Move these contents to the public API of GenericStub when
+ /// they are no longer experimental
+ class experimental_type {
+ public:
+ explicit experimental_type(GenericStub* stub) : stub_(stub) {}
+
+ void UnaryCall(ClientContext* context, const grpc::string& method,
+ const ByteBuffer* request, ByteBuffer* response,
+ std::function<void(Status)> on_completion);
+
+ private:
+ GenericStub* stub_;
+ };
+
+ /// NOTE: The function experimental() is not stable public API. It is a view
+ /// to the experimental components of this class. It may be changed or removed
+ /// at any time.
+ experimental_type experimental() { return experimental_type(this); }
+
private:
std::shared_ptr<ChannelInterface> channel_;
};
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h
index a5e930aaa5..7bd03b6b1b 100644
--- a/include/grpcpp/impl/codegen/call.h
+++ b/include/grpcpp/impl/codegen/call.h
@@ -608,6 +608,9 @@ class CallOpSetInterface : public CompletionQueueTag {
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
+
+ /// Get the tag to be used at the CQ
+ virtual void* cq_tag() = 0;
};
/// Primary implementation of CallOpSetInterface.
@@ -627,7 +630,7 @@ class CallOpSet : public CallOpSetInterface,
public Op5,
public Op6 {
public:
- CallOpSet() : return_tag_(this), call_(nullptr) {}
+ CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
@@ -654,7 +657,12 @@ class CallOpSet : public CallOpSetInterface,
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
+ void* cq_tag() override { return cq_tag_; }
+
+ void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
+
private:
+ void* cq_tag_;
void* return_tag_;
grpc_call* call_;
};
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
new file mode 100644
index 0000000000..5e1530a310
--- /dev/null
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
+#define GRPCPP_IMPL_CODEGEN_CALLBACK_COMMON_H
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/channel_interface.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/core_codegen_interface.h>
+#include <grpcpp/impl/codegen/status.h>
+
+// Forward declarations
+namespace grpc_core {
+class CQCallbackInterface;
+};
+
+namespace grpc {
+namespace internal {
+
+class CallbackWithStatusTag {
+ public:
+ // TODO(vjpai): make impl and ops part of this structure to avoid allocation,
+ // ownership transfer, and delete
+ CallbackWithStatusTag(std::function<void(Status)> f, bool self_delete,
+ CompletionQueueTag* ops);
+ ~CallbackWithStatusTag() { delete ops_; }
+ void* tag() { return static_cast<void*>(impl_); }
+ Status* status_ptr() { return status_; }
+ CompletionQueueTag* ops() { return ops_; }
+
+ // force_run can only be performed on a tag before it can ever be active
+ void force_run(Status s);
+
+ private:
+ grpc_core::CQCallbackInterface* impl_;
+ Status* status_;
+ CompletionQueueTag* ops_;
+};
+
+class CallbackWithSuccessTag {
+ public:
+ // TODO(vjpai): make impl and ops part of this structure to avoid allocation,
+ // ownership transfer, and delete
+ CallbackWithSuccessTag(std::function<void(bool)> f, bool self_delete,
+ CompletionQueueTag* ops);
+ ~CallbackWithSuccessTag() { delete ops_; }
+ void* tag() { return static_cast<void*>(impl_); }
+ CompletionQueueTag* ops() { return ops_; }
+
+ // force_run can only be performed on a tag before it can ever be active
+ void force_run(bool ok);
+
+ private:
+ grpc_core::CQCallbackInterface* impl_;
+ CompletionQueueTag* ops_;
+};
+
+} // namespace internal
+} // namespace grpc
+
+#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h
index ec1c6c25d8..b257acc1ab 100644
--- a/include/grpcpp/impl/codegen/channel_interface.h
+++ b/include/grpcpp/impl/codegen/channel_interface.h
@@ -41,6 +41,8 @@ class CallOpSetInterface;
class RpcMethod;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl;
template <class R>
class ClientAsyncReaderFactory;
template <class W>
@@ -103,6 +105,8 @@ class ChannelInterface {
friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ template <class InputMessage, class OutputMessage>
+ friend class ::grpc::internal::CallbackUnaryCallImpl;
friend class ::grpc::internal::RpcMethod;
virtual internal::Call CreateCall(const internal::RpcMethod& method,
ClientContext* context,
@@ -115,6 +119,16 @@ class ChannelInterface {
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
+
+ // EXPERIMENTAL
+ // A method to get the callbackable completion queue associated with this
+ // channel. If the return value is nullptr, this channel doesn't support
+ // callback operations.
+ // TODO(vjpai): Consider a better default like using a global CQ
+ // Returns nullptr (rather than being pure) since this is a new method
+ // and adding a new pure method to an interface would be a breaking change
+ // (even though this is private and non-API)
+ virtual CompletionQueue* CallbackCQ() { return nullptr; }
};
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
new file mode 100644
index 0000000000..419933f85c
--- /dev/null
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
+#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/callback_common.h>
+#include <grpcpp/impl/codegen/channel_interface.h>
+#include <grpcpp/impl/codegen/config.h>
+#include <grpcpp/impl/codegen/core_codegen_interface.h>
+#include <grpcpp/impl/codegen/status.h>
+
+namespace grpc {
+
+class Channel;
+class ClientContext;
+class CompletionQueue;
+
+namespace internal {
+class RpcMethod;
+
+/// Perform a callback-based unary call
+/// TODO(vjpai): Combine as much as possible with the blocking unary call code
+template <class InputMessage, class OutputMessage>
+void CallbackUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage* request,
+ OutputMessage* result,
+ std::function<void(Status)> on_completion) {
+ CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
+ channel, method, context, request, result, on_completion);
+}
+
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl {
+ public:
+ CallbackUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage* request,
+ OutputMessage* result,
+ std::function<void(Status)> on_completion) {
+ CompletionQueue* cq = channel->CallbackCQ();
+ GPR_CODEGEN_ASSERT(cq != nullptr);
+
+ // TODO(vjpai): Allocate this as part of the tag's arena
+ auto* ops = new CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata,
+ CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus>;
+
+ // TODO(vjpai): Move to using pre-allocated tags rather than new/self-delete
+ auto* tag = new CallbackWithStatusTag(on_completion, true, ops);
+
+ // TODO(vjpai): Unify code with sync API as much as possible
+ Call call(channel->CreateCall(method, context, cq));
+ Status s = ops->SendMessage(*request);
+ if (!s.ok()) {
+ tag->force_run(s);
+ return;
+ }
+ ops->SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ ops->RecvInitialMetadata(context);
+ ops->RecvMessage(result);
+ ops->AllowNoMessage();
+ ops->ClientSendClose();
+ ops->ClientRecvStatus(context, tag->status_ptr());
+ ops->set_cq_tag(tag->tag());
+ call.PerformOps(ops);
+ }
+};
+
+} // namespace internal
+} // namespace grpc
+
+#endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index 9dda4c7fac..7db31fcbcf 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -68,6 +68,8 @@ class CallOpClientRecvStatus;
class CallOpRecvInitialMetadata;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
+template <class InputMessage, class OutputMessage>
+class CallbackUnaryCallImpl;
} // namespace internal
template <class R>
@@ -389,6 +391,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ template <class InputMessage, class OutputMessage>
+ friend class ::grpc::internal::CallbackUnaryCallImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 3f7d4fb765..f52f9a53be 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -274,6 +274,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
+ // Friends that need access to constructor for callback CQ
+ friend class ::grpc::Channel;
+
/// EXPERIMENTAL
/// Creates a Thread Local cache to store the first event
/// On this completion queue queued from this thread. Once
diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h
new file mode 100644
index 0000000000..03a7e1400c
--- /dev/null
+++ b/include/grpcpp/support/client_callback.h
@@ -0,0 +1,24 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
+#define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
+
+#include <grpcpp/impl/codegen/client_callback.h>
+
+#endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 0769d9e4f6..c2cf450e94 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -1364,9 +1364,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
+ } else {
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 39b891c2e1..2a961e6e27 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -42,8 +42,10 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@@ -53,7 +55,12 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
g_gli_initializer.summon();
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ grpc_channel_destroy(c_channel_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+}
namespace {
@@ -135,8 +142,8 @@ void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops,
+ ops->cq_tag(), nullptr));
}
void* Channel::RegisterMethod(const char* method) {
@@ -185,4 +192,32 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
return ok;
}
+namespace {
+class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ public:
+ void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
+ void Run(bool) override {
+ delete cq_;
+ grpc_core::Delete(this);
+ }
+
+ private:
+ CompletionQueue* cq_ = nullptr;
+};
+} // namespace
+
+CompletionQueue* Channel::CallbackCQ() {
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-channel CQ registered
+ std::lock_guard<std::mutex> l(mu_);
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
+ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+}
+
} // namespace grpc
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 67ef46bebe..07c34c8878 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -16,9 +16,11 @@
*
*/
-#include <grpcpp/generic/generic_stub.h>
+#include <functional>
+#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/impl/rpc_method.h>
+#include <grpcpp/support/client_callback.h>
namespace grpc {
@@ -60,4 +62,14 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
context, request, false));
}
+void GenericStub::experimental_type::UnaryCall(
+ ClientContext* context, const grpc::string& method,
+ const ByteBuffer* request, ByteBuffer* response,
+ std::function<void(Status)> on_completion) {
+ internal::CallbackUnaryCall(
+ stub_->channel_.get(),
+ internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC),
+ context, request, response, on_completion);
+}
+
} // namespace grpc
diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc
new file mode 100644
index 0000000000..2598d37e27
--- /dev/null
+++ b/src/cpp/common/callback_common.cc
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <functional>
+
+#include <grpcpp/impl/codegen/callback_common.h>
+#include <grpcpp/impl/codegen/status.h>
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+namespace grpc {
+namespace internal {
+
+namespace {
+class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
+ public:
+ CallbackWithSuccessImpl(CallbackWithSuccessTag* parent,
+ std::function<void(bool)> f, bool self_delete)
+ : parent_(parent), func_(f), self_delete_(self_delete) {}
+
+ void Run(bool ok) override {
+ void* ignored = parent_->ops();
+ bool new_ok = ok;
+ GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == parent_->ops());
+ func_(ok);
+ if (self_delete_) {
+ delete parent_;
+ // Must use grpc_core::Delete since base is GRPC_ABSTRACT
+ grpc_core::Delete(this);
+ }
+ }
+
+ private:
+ CallbackWithSuccessTag* parent_;
+ std::function<void(bool)> func_;
+ bool self_delete_;
+};
+
+class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
+ public:
+ CallbackWithStatusImpl(CallbackWithStatusTag* parent,
+ std::function<void(Status)> f, bool self_delete)
+ : parent_(parent), func_(f), status_(), self_delete_(self_delete) {}
+
+ void Run(bool ok) override {
+ void* ignored = parent_->ops();
+
+ GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
+ GPR_ASSERT(ignored == parent_->ops());
+
+ func_(status_);
+ if (self_delete_) {
+ delete parent_;
+ // Must use grpc_core::Delete since base is GRPC_ABSTRACT
+ grpc_core::Delete(this);
+ }
+ }
+ Status* status_ptr() { return &status_; }
+
+ private:
+ CallbackWithStatusTag* parent_;
+ std::function<void(Status)> func_;
+ Status status_;
+ bool self_delete_;
+};
+
+} // namespace
+
+CallbackWithSuccessTag::CallbackWithSuccessTag(std::function<void(bool)> f,
+ bool self_delete,
+ CompletionQueueTag* ops)
+ : impl_(grpc_core::New<CallbackWithSuccessImpl>(this, f, self_delete)),
+ ops_(ops) {}
+
+void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
+
+CallbackWithStatusTag::CallbackWithStatusTag(std::function<void(Status)> f,
+ bool self_delete,
+ CompletionQueueTag* ops)
+ : ops_(ops) {
+ auto* impl = grpc_core::New<CallbackWithStatusImpl>(this, f, self_delete);
+ impl_ = impl;
+ status_ = impl->status_ptr();
+}
+
+void CallbackWithStatusTag::force_run(Status s) {
+ *status_ = s;
+ impl_->Run(true);
+}
+
+} // namespace internal
+} // namespace grpc
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 36c709eb45..f271825cba 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -657,7 +657,8 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
- auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ auto result =
+ grpc_call_start_batch(call->call(), cops, nops, ops->cq_tag(), nullptr);
if (result != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6f5bde0d9f..f76f64e3da 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -61,6 +61,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
+ /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
+ void* cq_tag() override { return this; }
+
void Unref();
private:
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 75dec56a60..0415efc1ef 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -98,6 +98,25 @@ grpc_cc_binary(
],
)
+grpc_cc_test(
+ name = "client_callback_end2end_test",
+ srcs = ["client_callback_end2end_test.cc"],
+ external_deps = [
+ "gtest",
+ ],
+ deps = [
+ ":test_service_impl",
+ "//:gpr",
+ "//:grpc",
+ "//:grpc++",
+ "//src/proto/grpc/testing:echo_messages_proto",
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ "//test/cpp/util:test_util",
+ ],
+)
+
grpc_cc_library(
name = "end2end_test_lib",
testonly = True,
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
new file mode 100644
index 0000000000..be732a0ca6
--- /dev/null
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -0,0 +1,126 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <functional>
+#include <mutex>
+
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/generic/generic_stub.h>
+#include <grpcpp/impl/codegen/proto_utils.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <grpcpp/server_context.h>
+#include <grpcpp/support/client_callback.h>
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+#include "test/cpp/util/byte_buffer_proto_helper.h"
+
+#include <gtest/gtest.h>
+
+namespace grpc {
+namespace testing {
+namespace {
+
+class ClientCallbackEnd2endTest : public ::testing::Test {
+ protected:
+ ClientCallbackEnd2endTest() {}
+
+ void SetUp() override {
+ ServerBuilder builder;
+ builder.RegisterService(&service_);
+
+ server_ = builder.BuildAndStart();
+ is_server_started_ = true;
+ }
+
+ void ResetStub() {
+ ChannelArguments args;
+ channel_ = server_->InProcessChannel(args);
+ stub_.reset(new GenericStub(channel_));
+ }
+
+ void TearDown() override {
+ if (is_server_started_) {
+ server_->Shutdown();
+ }
+ }
+
+ void SendRpcs(int num_rpcs) {
+ const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ EchoRequest request;
+ std::unique_ptr<ByteBuffer> send_buf;
+ ByteBuffer recv_buf;
+ ClientContext cli_ctx;
+
+ test_string += "Hello world. ";
+ request.set_message(test_string);
+ send_buf = SerializeToByteBuffer(&request);
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done;
+ stub_->experimental().UnaryCall(
+ &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
+ [&request, &recv_buf, &done, &mu, &cv](Status s) {
+ GPR_ASSERT(s.ok());
+
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
+ EXPECT_EQ(request.message(), response.message());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+ }
+ }
+ bool is_server_started_;
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<grpc::GenericStub> stub_;
+ TestServiceImpl service_;
+ std::unique_ptr<Server> server_;
+};
+
+TEST_F(ClientCallbackEnd2endTest, SimpleRpc) {
+ ResetStub();
+ SendRpcs(1);
+}
+
+TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
+ ResetStub();
+ SendRpcs(10);
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index 9a97ee84f2..3b7fd1fa8e 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -945,7 +945,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -997,6 +999,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 0cd4cfd647..a72390d9f8 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -946,7 +946,9 @@ include/grpcpp/impl/codegen/async_unary_call.h \
include/grpcpp/impl/codegen/byte_buffer.h \
include/grpcpp/impl/codegen/call.h \
include/grpcpp/impl/codegen/call_hook.h \
+include/grpcpp/impl/codegen/callback_common.h \
include/grpcpp/impl/codegen/channel_interface.h \
+include/grpcpp/impl/codegen/client_callback.h \
include/grpcpp/impl/codegen/client_context.h \
include/grpcpp/impl/codegen/client_unary_call.h \
include/grpcpp/impl/codegen/completion_queue.h \
@@ -999,6 +1001,7 @@ include/grpcpp/support/async_stream.h \
include/grpcpp/support/async_unary_call.h \
include/grpcpp/support/byte_buffer.h \
include/grpcpp/support/channel_arguments.h \
+include/grpcpp/support/client_callback.h \
include/grpcpp/support/config.h \
include/grpcpp/support/proto_buffer_reader.h \
include/grpcpp/support/proto_buffer_writer.h \
@@ -1187,6 +1190,7 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \
+src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 8ea5126fde..9637fa8787 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3309,6 +3309,25 @@
"grpc",
"grpc++",
"grpc++_test_util",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "client_callback_end2end_test",
+ "src": [
+ "test/cpp/end2end/client_callback_end2end_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
+ "grpc++_test_util",
"grpc_cli_libs",
"grpc_test_util"
],
@@ -11081,7 +11100,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@@ -11147,7 +11168,9 @@
"include/grpcpp/impl/codegen/byte_buffer.h",
"include/grpcpp/impl/codegen/call.h",
"include/grpcpp/impl/codegen/call_hook.h",
+ "include/grpcpp/impl/codegen/callback_common.h",
"include/grpcpp/impl/codegen/channel_interface.h",
+ "include/grpcpp/impl/codegen/client_callback.h",
"include/grpcpp/impl/codegen/client_context.h",
"include/grpcpp/impl/codegen/client_unary_call.h",
"include/grpcpp/impl/codegen/completion_queue.h",
@@ -11305,6 +11328,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -11409,6 +11433,7 @@
"include/grpcpp/support/async_unary_call.h",
"include/grpcpp/support/byte_buffer.h",
"include/grpcpp/support/channel_arguments.h",
+ "include/grpcpp/support/client_callback.h",
"include/grpcpp/support/config.h",
"include/grpcpp/support/proto_buffer_reader.h",
"include/grpcpp/support/proto_buffer_writer.h",
@@ -11428,6 +11453,7 @@
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
+ "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index fba76d69d1..a48b9c60da 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3954,6 +3954,30 @@
"posix",
"windows"
],
+ "cpu_cost": 0.5,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "client_callback_end2end_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],