aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-09-20 16:35:33 -0700
committerGravatar GitHub <noreply@github.com>2018-09-20 16:35:33 -0700
commit3ee2919623dfcc11ac58e3e2a69c8986a2dd90eb (patch)
tree4614ea685c719cdf47b7ff8834545c3db7d1a4f4
parent07308653a8ed2ec0ff7ef3a35197bf5b6caaee8b (diff)
parentda1b75b5d5a9e9d437886d324f61241808fe05ab (diff)
Merge pull request #16646 from vjpai/callback_codegen_client_unary
EXPERIMENTAL: Codegen for callback client unary calls
-rw-r--r--BUILD1
-rw-r--r--CMakeLists.txt3
-rw-r--r--Makefile3
-rw-r--r--build.yaml1
-rw-r--r--gRPC-C++.podspec1
-rw-r--r--grpc.gyp2
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--include/grpc/impl/codegen/grpc_types.h15
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h93
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h2
-rw-r--r--src/compiler/cpp_generator.cc148
-rw-r--r--src/core/lib/surface/completion_queue.cc34
-rw-r--r--src/core/lib/surface/completion_queue.h19
-rw-r--r--src/core/lib/surface/completion_queue_factory.cc6
-rw-r--r--src/cpp/client/channel_cc.cc10
-rw-r--r--src/cpp/common/callback_common.cc149
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h2
-rw-r--r--test/core/end2end/inproc_callback_test.cc31
-rw-r--r--test/core/surface/completion_queue_test.cc25
-rw-r--r--test/cpp/codegen/compiler_test_golden50
-rw-r--r--test/cpp/end2end/client_callback_end2end_test.cc111
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/run_tests/generated/sources_and_headers.json1
23 files changed, 461 insertions, 250 deletions
diff --git a/BUILD b/BUILD
index 71980bdbf4..76d78d737f 100644
--- a/BUILD
+++ b/BUILD
@@ -119,7 +119,6 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
- "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9517883916..124bd0dc60 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2778,7 +2778,6 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -3140,7 +3139,6 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -4267,7 +4265,6 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
diff --git a/Makefile b/Makefile
index 9634fa807f..82454a54d8 100644
--- a/Makefile
+++ b/Makefile
@@ -5233,7 +5233,6 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5603,7 +5602,6 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -6688,7 +6686,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
diff --git a/build.yaml b/build.yaml
index a39050336d..98e315711c 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1329,7 +1329,6 @@ filegroups:
- src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc
- - src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 0dcea02f1e..d45e0c519b 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -190,7 +190,6 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/grpc.gyp b/grpc.gyp
index a3adc3f0b9..15d20053f4 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1385,7 +1385,6 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
@@ -1533,7 +1532,6 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 3ef95ff462..787d6ae6d7 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck(
of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING.
This function is experimental. */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback(
- void* shutdown_callback, void* reserved);
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved);
/** Create a completion queue */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create(
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 5f3b96f40b..9ed5b3c1d4 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -660,6 +660,19 @@ typedef enum {
GRPC_CQ_CALLBACK
} grpc_cq_completion_type;
+/** EXPERIMENTAL: Specifies an interface class to be used as a tag
+ for callback-based completion queues. This can be used directly,
+ as the first element of a struct in C, or as a base class in C++.
+ Its "run" value should be assigned to some non-member function, such as
+ a static method. */
+typedef struct grpc_experimental_completion_queue_functor {
+ /** The run member specifies a function that will be called when this
+ tag is extracted from the completion queue. Its arguments will be a
+ pointer to this functor and a boolean that indicates whether the
+ operation succeeded (non-zero) or failed (zero) */
+ void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
+} grpc_experimental_completion_queue_functor;
+
/* The upgrade to version 2 is currently experimental. */
#define GRPC_CQ_CURRENT_VERSION 2
@@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes {
/* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */
/** When creating a callbackable CQ, pass in a functor to get invoked when
* shutdown is complete */
- void* cq_shutdown_cb;
+ grpc_experimental_completion_queue_functor* cq_shutdown_cb;
/* END OF VERSION 2 CQ ATTRIBUTES */
} grpc_completion_queue_attributes;
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index 8b3ad66a8d..ca2f867d04 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -21,25 +21,36 @@
#include <functional>
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/status.h>
-// Forward declarations
-namespace grpc_core {
-class CQCallbackInterface;
-};
-
namespace grpc {
namespace internal {
+/// An exception-safe way of invoking a user-specified callback function
+template <class Func, class Arg>
+void CatchingCallback(Func&& func, Arg&& arg) {
+#if GRPC_ALLOW_EXCEPTIONS
+ try {
+ func(arg);
+ } catch (...) {
+ // nothing to return or change here, just don't crash the library
+ }
+#else // GRPC_ALLOW_EXCEPTIONS
+ func(arg);
+#endif // GRPC_ALLOW_EXCEPTIONS
+}
+
// The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
-class CallbackWithStatusTag {
+class CallbackWithStatusTag
+ : public grpc_experimental_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -54,24 +65,49 @@ class CallbackWithStatusTag {
static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
- CompletionQueueTag* ops);
+ CompletionQueueTag* ops)
+ : call_(call), func_(std::move(f)), ops_(ops) {
+ g_core_codegen_interface->grpc_call_ref(call);
+ functor_run = &CallbackWithStatusTag::StaticRun;
+ }
~CallbackWithStatusTag() {}
- void* tag() { return static_cast<void*>(impl_); }
- Status* status_ptr() { return status_; }
- CompletionQueueTag* ops() { return ops_; }
+ Status* status_ptr() { return &status_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
- void force_run(Status s);
+ void force_run(Status s) {
+ status_ = std::move(s);
+ Run(true);
+ }
private:
- grpc_core::CQCallbackInterface* impl_;
- Status* status_;
+ grpc_call* call_;
+ std::function<void(Status)> func_;
CompletionQueueTag* ops_;
+ Status status_;
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = ops_;
+
+ GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &ok));
+ GPR_CODEGEN_ASSERT(ignored == ops_);
+
+ // Last use of func_ or status_, so ok to move them out
+ CatchingCallback(std::move(func_), std::move(status_));
+
+ func_ = nullptr; // reset to clear this out for sure
+ status_ = Status(); // reset to clear this out for sure
+ g_core_codegen_interface->grpc_call_unref(call_);
+ }
};
-class CallbackWithSuccessTag {
+class CallbackWithSuccessTag
+ : public grpc_experimental_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -86,19 +122,40 @@ class CallbackWithSuccessTag {
static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
- CompletionQueueTag* ops);
+ CompletionQueueTag* ops)
+ : call_(call), func_(std::move(f)), ops_(ops) {
+ g_core_codegen_interface->grpc_call_ref(call);
+ functor_run = &CallbackWithSuccessTag::StaticRun;
+ }
- void* tag() { return static_cast<void*>(impl_); }
CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
- void force_run(bool ok);
+ void force_run(bool ok) { Run(ok); }
private:
- grpc_core::CQCallbackInterface* impl_;
+ grpc_call* call_;
+ std::function<void(bool)> func_;
CompletionQueueTag* ops_;
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = ops_;
+ bool new_ok = ok;
+ GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok));
+ GPR_CODEGEN_ASSERT(ignored == ops_);
+
+ // Last use of func_, so ok to move it out for rvalue call above
+ CatchingCallback(std::move(func_), ok);
+
+ func_ = nullptr; // reset to clear this out for sure
+ g_core_codegen_interface->grpc_call_unref(call_);
+ }
};
} // namespace internal
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index fc81c8aa0a..4d4faea063 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -84,7 +84,7 @@ class CallbackUnaryCallImpl {
ops->AllowNoMessage();
ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr());
- ops->set_cq_tag(tag->tag());
+ ops->set_cq_tag(tag);
call.PerformOps(ops);
}
};
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 1e0c36451b..56716493dc 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -128,6 +128,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file,
"");
}
static const char* headers_strs[] = {
+ "functional",
"grpcpp/impl/codegen/async_generic_service.h",
"grpcpp/impl/codegen/async_stream.h",
"grpcpp/impl/codegen/async_unary_call.h",
@@ -547,6 +548,116 @@ void PrintHeaderClientMethod(grpc_generator::Printer* printer,
}
}
+void PrintHeaderClientMethodCallbackInterfacesStart(
+ grpc_generator::Printer* printer,
+ std::map<grpc::string, grpc::string>* vars) {
+ // This declares the interface for the callback-based API. The components
+ // are pure; even though this is new (post-1.0) API, it can be pure because
+ // it is an entirely new interface that happens to be scoped within
+ // StubInterface, not new additions to StubInterface itself
+ printer->Print("class experimental_async_interface {\n");
+ // All methods in this new interface are public. There is no need for private
+ // "Raw" methods since the callback-based API returns unowned raw pointers
+ printer->Print(" public:\n");
+ printer->Indent();
+ printer->Print("virtual ~experimental_async_interface() {}\n");
+}
+
+void PrintHeaderClientMethodCallbackInterfaces(
+ grpc_generator::Printer* printer, const grpc_generator::Method* method,
+ std::map<grpc::string, grpc::string>* vars, bool is_public) {
+ // Reserve is_public for future expansion
+ assert(is_public);
+
+ (*vars)["Method"] = method->name();
+ (*vars)["Request"] = method->input_type_name();
+ (*vars)["Response"] = method->output_type_name();
+
+ if (method->NoStreaming()) {
+ printer->Print(*vars,
+ "virtual void $Method$(::grpc::ClientContext* context, "
+ "const $Request$* request, $Response$* response, "
+ "std::function<void(::grpc::Status)>) = 0;\n");
+ } else if (ClientOnlyStreaming(method)) {
+ // TODO(vjpai): Add support for client-side streaming
+ } else if (ServerOnlyStreaming(method)) {
+ // TODO(vjpai): Add support for server-side streaming
+ } else if (method->BidiStreaming()) {
+ // TODO(vjpai): Add support for bidi streaming
+ }
+}
+
+void PrintHeaderClientMethodCallbackInterfacesEnd(
+ grpc_generator::Printer* printer,
+ std::map<grpc::string, grpc::string>* vars) {
+ printer->Outdent();
+ printer->Print("};\n");
+
+ // Declare a function to give the async stub contents. It can't be pure
+ // since this is a new API in StubInterface, but it is meaningless by default
+ // (since any stub that wants to use it must have its own implementation of
+ // the callback functions therein), so make the default return value nullptr.
+ // Intentionally include the word "class" to avoid possible shadowing.
+ printer->Print(
+ "virtual class experimental_async_interface* experimental_async() { "
+ "return nullptr; }\n");
+}
+
+void PrintHeaderClientMethodCallbackStart(
+ grpc_generator::Printer* printer,
+ std::map<grpc::string, grpc::string>* vars) {
+ // This declares the stub entry for the callback-based API.
+ printer->Print("class experimental_async final :\n");
+ printer->Print(" public StubInterface::experimental_async_interface {\n");
+ printer->Print(" public:\n");
+ printer->Indent();
+}
+
+void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
+ const grpc_generator::Method* method,
+ std::map<grpc::string, grpc::string>* vars,
+ bool is_public) {
+ // Reserve is_public for future expansion
+ assert(is_public);
+
+ (*vars)["Method"] = method->name();
+ (*vars)["Request"] = method->input_type_name();
+ (*vars)["Response"] = method->output_type_name();
+
+ if (method->NoStreaming()) {
+ printer->Print(*vars,
+ "void $Method$(::grpc::ClientContext* context, "
+ "const $Request$* request, $Response$* response, "
+ "std::function<void(::grpc::Status)>) override;\n");
+ } else if (ClientOnlyStreaming(method)) {
+ // TODO(vjpai): Add support for client-side streaming
+ } else if (ServerOnlyStreaming(method)) {
+ // TODO(vjpai): Add support for server-side streaming
+ } else if (method->BidiStreaming()) {
+ // TODO(vjpai): Add support for bidi streaming
+ }
+}
+
+void PrintHeaderClientMethodCallbackEnd(
+ grpc_generator::Printer* printer,
+ std::map<grpc::string, grpc::string>* vars) {
+ printer->Outdent();
+ printer->Print(" private:\n");
+ printer->Indent();
+ printer->Print("friend class Stub;\n");
+ printer->Print("explicit experimental_async(Stub* stub): stub_(stub) { }\n");
+ // include a function with a dummy use of stub_ to avoid an unused
+ // private member warning for service with no methods
+ printer->Print("Stub* stub() { return stub_; }\n");
+ printer->Print("Stub* stub_;\n");
+ printer->Outdent();
+ printer->Print("};\n");
+
+ printer->Print(
+ "class experimental_async_interface* experimental_async() override { "
+ "return &async_stub_; }\n");
+}
+
void PrintHeaderClientMethodData(grpc_generator::Printer* printer,
const grpc_generator::Method* method,
std::map<grpc::string, grpc::string>* vars) {
@@ -951,6 +1062,14 @@ void PrintHeaderService(grpc_generator::Printer* printer,
true);
printer->Print(service->method(i)->GetTrailingComments("//").c_str());
}
+ PrintHeaderClientMethodCallbackInterfacesStart(printer, vars);
+ for (int i = 0; i < service->method_count(); ++i) {
+ printer->Print(service->method(i)->GetLeadingComments("//").c_str());
+ PrintHeaderClientMethodCallbackInterfaces(printer, service->method(i).get(),
+ vars, true);
+ printer->Print(service->method(i)->GetTrailingComments("//").c_str());
+ }
+ PrintHeaderClientMethodCallbackInterfacesEnd(printer, vars);
printer->Outdent();
printer->Print("private:\n");
printer->Indent();
@@ -970,10 +1089,17 @@ void PrintHeaderService(grpc_generator::Printer* printer,
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
+ PrintHeaderClientMethodCallbackStart(printer, vars);
+ for (int i = 0; i < service->method_count(); ++i) {
+ PrintHeaderClientMethodCallback(printer, service->method(i).get(), vars,
+ true);
+ }
+ PrintHeaderClientMethodCallbackEnd(printer, vars);
printer->Outdent();
printer->Print("\n private:\n");
printer->Indent();
printer->Print("std::shared_ptr< ::grpc::ChannelInterface> channel_;\n");
+ printer->Print("class experimental_async async_stub_{this};\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, false);
}
@@ -1199,10 +1325,12 @@ grpc::string GetSourceIncludes(grpc_generator::File* file,
std::map<grpc::string, grpc::string> vars;
static const char* headers_strs[] = {
+ "functional",
"grpcpp/impl/codegen/async_stream.h",
"grpcpp/impl/codegen/async_unary_call.h",
"grpcpp/impl/codegen/channel_interface.h",
"grpcpp/impl/codegen/client_unary_call.h",
+ "grpcpp/impl/codegen/client_callback.h",
"grpcpp/impl/codegen/method_handler_impl.h",
"grpcpp/impl/codegen/rpc_service_method.h",
"grpcpp/impl/codegen/service_type.h",
@@ -1247,6 +1375,17 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
" return ::grpc::internal::BlockingUnaryCall"
"(channel_.get(), rpcmethod_$Method$_, "
"context, request, response);\n}\n\n");
+
+ printer->Print(*vars,
+ "void $ns$$Service$::Stub::experimental_async::$Method$("
+ "::grpc::ClientContext* context, "
+ "const $Request$* request, $Response$* response, "
+ "std::function<void(::grpc::Status)> f) {\n");
+ printer->Print(*vars,
+ " return ::grpc::internal::CallbackUnaryCall"
+ "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
+ "context, request, response, std::move(f));\n}\n\n");
+
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1277,6 +1416,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"rpcmethod_$Method$_, "
"context, response);\n"
"}\n\n");
+
+ // TODO(vjpai): Add callback version
+
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1308,6 +1450,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"rpcmethod_$Method$_, "
"context, request);\n"
"}\n\n");
+
+ // TODO(vjpai): Add callback version
+
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1339,6 +1484,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"rpcmethod_$Method$_, "
"context);\n"
"}\n\n");
+
+ // TODO(vjpai): Add callback version
+
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index c2cf450e94..5dc9991f70 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback);
+ void (*init)(void* data,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -267,7 +268,7 @@ typedef struct cq_callback_data {
bool shutdown_called;
/** A callback that gets invoked when the CQ completes shutdown */
- grpc_core::CQCallbackInterface* shutdown_callback;
+ grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data;
/* Completion queue structure */
@@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_callback(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
@@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+ grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) {
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) {
}
static void cq_init_callback(
- void* data, grpc_core::CQCallbackInterface* shutdown_callback) {
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -859,7 +860,8 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
- (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success);
+ auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
+ (*functor->functor_run)(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
- callback->Run(true);
+ (*callback->functor_run)(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index a7c524d8e8..d60fe6d6ef 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -48,23 +48,6 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
-/// For callback CQs, the tag that is passed in for an operation must
-/// actually be a pointer to an implementation of the following class.
-/// When the operation completes, the tag will be typecasted from void*
-/// to grpc_core::CQCallbackInterface* and then the Run method will be
-/// invoked on it. In practice, the language binding (e.g., C++ API
-/// implementation) is responsible for providing and using an implementation
-/// of this abstract base class.
-namespace grpc_core {
-class CQCallbackInterface {
- public:
- virtual ~CQCallbackInterface() {}
- virtual void Run(bool) GRPC_ABSTRACT;
-
- GRPC_ABSTRACT_BASE_CLASS
-};
-} // namespace grpc_core
-
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line);
@@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback);
+ grpc_experimental_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc
index ed92dd7eba..2616c156e4 100644
--- a/src/core/lib/surface/completion_queue_factory.cc
+++ b/src/core/lib/surface/completion_queue_factory.cc
@@ -31,8 +31,7 @@ static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal(
- attr->cq_completion_type, attr->cq_polling_type,
- static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb));
+ attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
}
grpc_completion_queue* grpc_completion_queue_create_for_callback(
- void* shutdown_callback, void* reserved) {
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {
2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index ad71286e05..c59059f045 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
}
namespace {
-class ShutdownCallback : public grpc_core::CQCallbackInterface {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
+ ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
// TakeCQ takes ownership of the cq into the shutdown callback
// so that the shutdown callback will be responsible for destroying it
void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
- void Run(bool) override {
- delete cq_;
- grpc_core::Delete(this);
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ grpc_core::Delete(callback);
}
private:
diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc
deleted file mode 100644
index a0c8eeb516..0000000000
--- a/src/cpp/common/callback_common.cc
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <functional>
-
-#include <grpcpp/impl/codegen/callback_common.h>
-#include <grpcpp/impl/codegen/status.h>
-
-#include "src/core/lib/gprpp/memory.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-namespace grpc {
-namespace internal {
-namespace {
-
-template <class Func, class Arg>
-void CatchingCallback(Func&& func, Arg&& arg) {
-#if GRPC_ALLOW_EXCEPTIONS
- try {
- func(arg);
- } catch (...) {
- // nothing to return or change here, just don't crash the library
- }
-#else // GRPC_ALLOW_EXCEPTIONS
- func(arg);
-#endif // GRPC_ALLOW_EXCEPTIONS
-}
-
-class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithSuccessImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
- std::function<void(bool)> f)
- : call_(call), parent_(parent), func_(std::move(f)) {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
- bool new_ok = ok;
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_, so ok to move it out for rvalue call above
- CatchingCallback(std::move(func_), ok);
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
-
- private:
- grpc_call* call_;
- CallbackWithSuccessTag* parent_;
- std::function<void(bool)> func_;
-};
-
-class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithStatusImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
- std::function<void(Status)> f)
- : call_(call), parent_(parent), func_(std::move(f)), status_() {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
-
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_ or status_, so ok to move them out
- CatchingCallback(std::move(func_), std::move(status_));
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
- Status* status_ptr() { return &status_; }
-
- private:
- grpc_call* call_;
- CallbackWithStatusTag* parent_;
- std::function<void(Status)> func_;
- Status status_;
-};
-
-} // namespace
-
-CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
- std::function<void(bool)> f,
- CompletionQueueTag* ops)
- : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
- CallbackWithSuccessImpl(call, this, std::move(f))),
- ops_(ops) {}
-
-void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
-
-CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
- std::function<void(Status)> f,
- CompletionQueueTag* ops)
- : ops_(ops) {
- auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
- CallbackWithStatusImpl(call, this, std::move(f));
- impl_ = impl;
- status_ = impl->status_ptr();
-}
-
-void CallbackWithStatusTag::force_run(Status s) {
- *status_ = std::move(s);
- impl_->Run(true);
-}
-
-} // namespace internal
-} // namespace grpc
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index d00e75c326..e25d953a18 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
-typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved);
+typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved);
extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import;
#define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved);
diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc
index 0d6c7c75a8..310030046a 100644
--- a/test/core/end2end/inproc_callback_test.cc
+++ b/test/core/end2end/inproc_callback_test.cc
@@ -37,13 +37,16 @@ typedef struct inproc_fixture_data {
namespace {
template <typename F>
-class CQDeletingCallback : public grpc_core::CQCallbackInterface {
+class CQDeletingCallback : public grpc_experimental_completion_queue_functor {
public:
- explicit CQDeletingCallback(F f) : func_(f) {}
- ~CQDeletingCallback() override {}
- void Run(bool ok) override {
- func_(ok);
- grpc_core::Delete(this);
+ explicit CQDeletingCallback(F f) : func_(f) {
+ functor_run = &CQDeletingCallback::Run;
+ }
+ ~CQDeletingCallback() {}
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ auto* callback = static_cast<CQDeletingCallback*>(cb);
+ callback->func_(static_cast<bool>(ok));
+ grpc_core::Delete(callback);
}
private:
@@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface {
};
template <typename F>
-grpc_core::CQCallbackInterface* NewDeletingCallback(F f) {
+grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) {
return grpc_core::New<CQDeletingCallback<F>>(f);
}
-class ShutdownCallback : public grpc_core::CQCallbackInterface {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
ShutdownCallback() : done_(false) {
+ functor_run = &ShutdownCallback::StaticRun;
gpr_mu_init(&mu_);
gpr_cv_init(&cv_);
}
- ~ShutdownCallback() override {}
- void Run(bool ok) override {
+ ~ShutdownCallback() {}
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ callback->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
gpr_log(GPR_DEBUG, "CQ shutdown notification invoked");
gpr_mu_lock(&mu_);
done_ = true;
@@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) {
// This function creates a callback functor that emits the
// desired tag into the global tag set
-static grpc_core::CQCallbackInterface* tag(intptr_t t) {
+static grpc_experimental_completion_queue_functor* tag(intptr_t t) {
auto func = [t](bool ok) {
gpr_mu_lock(&tags_mu);
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);
diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc
index b889fd0fc6..f7ce8a7042 100644
--- a/test/core/surface/completion_queue_test.cc
+++ b/test/core/surface/completion_queue_test.cc
@@ -369,11 +369,15 @@ static void test_callback(void) {
LOG_TEST("test_callback");
bool got_shutdown = false;
- class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
- ShutdownCallback(bool* done) : done_(done) {}
+ ShutdownCallback(bool* done) : done_(done) {
+ functor_run = &ShutdownCallback::Run;
+ }
~ShutdownCallback() {}
- void Run(bool ok) override { *done_ = ok; }
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
+ }
private:
bool* done_;
@@ -391,14 +395,17 @@ static void test_callback(void) {
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int counter = 0;
- class TagCallback : public grpc_core::CQCallbackInterface {
+ class TagCallback : public grpc_experimental_completion_queue_functor {
public:
- TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {}
+ TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
+ functor_run = &TagCallback::Run;
+ }
~TagCallback() {}
- void Run(bool ok) override {
- GPR_ASSERT(ok);
- *counter_ += tag_;
- grpc_core::Delete(this);
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ GPR_ASSERT(static_cast<bool>(ok));
+ auto* callback = static_cast<TagCallback*>(cb);
+ *callback->counter_ += callback->tag_;
+ grpc_core::Delete(callback);
};
private:
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index 756f6a0224..93e1e68654 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -26,6 +26,7 @@
#include "src/proto/grpc/testing/compiler_test.pb.h"
+#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
@@ -105,6 +106,23 @@ class ServiceA final {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq));
}
// Method A4 trailing comment 1
+ class experimental_async_interface {
+ public:
+ virtual ~experimental_async_interface() {}
+ // MethodA1 leading comment 1
+ virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
+ // MethodA1 trailing comment 1
+ // MethodA2 detached leading comment 1
+ //
+ // Method A2 leading comment 1
+ // Method A2 leading comment 2
+ // MethodA2 trailing comment 1
+ // Method A3 leading comment 1
+ // Method A3 trailing comment 1
+ // Method A4 leading comment 1
+ // Method A4 trailing comment 1
+ };
+ virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
@@ -155,9 +173,21 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq));
}
+ class experimental_async final :
+ public StubInterface::experimental_async_interface {
+ public:
+ void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
+ private:
+ friend class Stub;
+ explicit experimental_async(Stub* stub): stub_(stub) { }
+ Stub* stub() { return stub_; }
+ Stub* stub_;
+ };
+ class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
+ class experimental_async async_stub_{this};
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override;
@@ -488,6 +518,14 @@ class ServiceB final {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq));
}
// MethodB1 trailing comment 1
+ class experimental_async_interface {
+ public:
+ virtual ~experimental_async_interface() {}
+ // MethodB1 leading comment 1
+ virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
+ // MethodB1 trailing comment 1
+ };
+ virtual class experimental_async_interface* experimental_async() { return nullptr; }
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
@@ -502,9 +540,21 @@ class ServiceB final {
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq));
}
+ class experimental_async final :
+ public StubInterface::experimental_async_interface {
+ public:
+ void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
+ private:
+ friend class Stub;
+ explicit experimental_async(Stub* stub): stub_(stub) { }
+ Stub* stub() { return stub_; }
+ Stub* stub_;
+ };
+ class experimental_async_interface* experimental_async() override { return &async_stub_; }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
+ class experimental_async async_stub_{this};
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
const ::grpc::internal::RpcMethod rpcmethod_MethodB1_;
diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc
index d8cb44b694..62a85641c7 100644
--- a/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/test/cpp/end2end/client_callback_end2end_test.cc
@@ -18,6 +18,7 @@
#include <functional>
#include <mutex>
+#include <thread>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
@@ -55,7 +56,8 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
void ResetStub() {
ChannelArguments args;
channel_ = server_->InProcessChannel(args);
- stub_.reset(new GenericStub(channel_));
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ generic_stub_.reset(new GenericStub(channel_));
}
void TearDown() override {
@@ -64,7 +66,45 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
}
}
- void SendRpcs(int num_rpcs, bool maybe_except) {
+ void SendRpcs(int num_rpcs, bool with_binary_metadata) {
+ grpc::string test_string("");
+ for (int i = 0; i < num_rpcs; i++) {
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext cli_ctx;
+
+ test_string += "Hello world. ";
+ request.set_message(test_string);
+
+ if (with_binary_metadata) {
+ char bytes[8] = {'\0', '\1', '\2', '\3',
+ '\4', '\5', '\6', static_cast<char>(i)};
+ cli_ctx.AddMetadata("custom-bin", grpc::string(bytes, 8));
+ }
+
+ cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ stub_->experimental_async()->Echo(
+ &cli_ctx, &request, &response,
+ [&request, &response, &done, &mu, &cv](Status s) {
+ GPR_ASSERT(s.ok());
+
+ EXPECT_EQ(request.message(), response.message());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+ }
+ }
+
+ void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
grpc::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
@@ -80,7 +120,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
std::mutex mu;
std::condition_variable cv;
bool done = false;
- stub_->experimental().UnaryCall(
+ generic_stub_->experimental().UnaryCall(
&cli_ctx, kMethodName, send_buf.get(), &recv_buf,
[&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
GPR_ASSERT(s.ok());
@@ -105,9 +145,11 @@ class ClientCallbackEnd2endTest : public ::testing::Test {
}
}
}
+
bool is_server_started_;
std::shared_ptr<Channel> channel_;
- std::unique_ptr<grpc::GenericStub> stub_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::unique_ptr<grpc::GenericStub> generic_stub_;
TestServiceImpl service_;
std::unique_ptr<Server> server_;
};
@@ -122,13 +164,72 @@ TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
SendRpcs(10, false);
}
+TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
+ ResetStub();
+ SendRpcs(10, true);
+}
+
+TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
+ ResetStub();
+ SendRpcsGeneric(10, false);
+}
+
#if GRPC_ALLOW_EXCEPTIONS
TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
- SendRpcs(10, true);
+ SendRpcsGeneric(10, true);
}
#endif
+TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
+ ResetStub();
+ std::vector<std::thread> threads;
+ threads.reserve(10);
+ for (int i = 0; i < 10; ++i) {
+ threads.emplace_back([this] { SendRpcs(10, true); });
+ }
+ for (int i = 0; i < 10; ++i) {
+ threads[i].join();
+ }
+}
+
+TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) {
+ ResetStub();
+ std::vector<std::thread> threads;
+ threads.reserve(10);
+ for (int i = 0; i < 10; ++i) {
+ threads.emplace_back([this] { SendRpcs(10, false); });
+ }
+ for (int i = 0; i < 10; ++i) {
+ threads[i].join();
+ }
+}
+
+TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("hello");
+ context.TryCancel();
+
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ stub_->experimental_async()->Echo(
+ &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
+ EXPECT_EQ("", response.message());
+ EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index a72390d9f8..cfc5ef98e7 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \
-src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 96645d0bbf..3f7393ae94 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -11473,7 +11473,6 @@
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
- "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h",