diff options
author | Vijay Pai <vpai@google.com> | 2018-09-20 16:35:33 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-20 16:35:33 -0700 |
commit | 3ee2919623dfcc11ac58e3e2a69c8986a2dd90eb (patch) | |
tree | 4614ea685c719cdf47b7ff8834545c3db7d1a4f4 | |
parent | 07308653a8ed2ec0ff7ef3a35197bf5b6caaee8b (diff) | |
parent | da1b75b5d5a9e9d437886d324f61241808fe05ab (diff) |
Merge pull request #16646 from vjpai/callback_codegen_client_unary
EXPERIMENTAL: Codegen for callback client unary calls
23 files changed, 461 insertions, 250 deletions
@@ -119,7 +119,6 @@ GRPCXX_SRCS = [ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/completion_queue_cc.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 9517883916..124bd0dc60 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2778,7 +2778,6 @@ add_library(grpc++ src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -3140,7 +3139,6 @@ add_library(grpc++_cronet src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -4267,7 +4265,6 @@ add_library(grpc++_unsecure src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -5233,7 +5233,6 @@ LIBGRPC++_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -5603,7 +5602,6 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -6688,7 +6686,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ diff --git a/build.yaml b/build.yaml index a39050336d..98e315711c 100644 --- a/build.yaml +++ b/build.yaml @@ -1329,7 +1329,6 @@ filegroups: - src/cpp/client/credentials_cc.cc - src/cpp/client/generic_stub.cc - src/cpp/common/alarm.cc - - src/cpp/common/callback_common.cc - src/cpp/common/channel_arguments.cc - src/cpp/common/channel_filter.cc - src/cpp/common/completion_queue_cc.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 0dcea02f1e..d45e0c519b 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -190,7 +190,6 @@ Pod::Spec.new do |s| 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1385,7 +1385,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1533,7 +1532,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3ef95ff462..787d6ae6d7 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck( of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING. This function is experimental. */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved); + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved); /** Create a completion queue */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create( diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 5f3b96f40b..9ed5b3c1d4 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -660,6 +660,19 @@ typedef enum { GRPC_CQ_CALLBACK } grpc_cq_completion_type; +/** EXPERIMENTAL: Specifies an interface class to be used as a tag + for callback-based completion queues. This can be used directly, + as the first element of a struct in C, or as a base class in C++. + Its "run" value should be assigned to some non-member function, such as + a static method. */ +typedef struct grpc_experimental_completion_queue_functor { + /** The run member specifies a function that will be called when this + tag is extracted from the completion queue. Its arguments will be a + pointer to this functor and a boolean that indicates whether the + operation succeeded (non-zero) or failed (zero) */ + void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int); +} grpc_experimental_completion_queue_functor; + /* The upgrade to version 2 is currently experimental. */ #define GRPC_CQ_CURRENT_VERSION 2 @@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes { /* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */ /** When creating a callbackable CQ, pass in a functor to get invoked when * shutdown is complete */ - void* cq_shutdown_cb; + grpc_experimental_completion_queue_functor* cq_shutdown_cb; /* END OF VERSION 2 CQ ATTRIBUTES */ } grpc_completion_queue_attributes; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8b3ad66a8d..ca2f867d04 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -21,25 +21,36 @@ #include <functional> +#include <grpc/impl/codegen/grpc_types.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/status.h> -// Forward declarations -namespace grpc_core { -class CQCallbackInterface; -}; - namespace grpc { namespace internal { +/// An exception-safe way of invoking a user-specified callback function +template <class Func, class Arg> +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. -class CallbackWithStatusTag { +class CallbackWithStatusTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -54,24 +65,49 @@ class CallbackWithStatusTag { static void operator delete(void*, void*) { assert(0); } CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + } ~CallbackWithStatusTag() {} - void* tag() { return static_cast<void*>(impl_); } - Status* status_ptr() { return status_; } - CompletionQueueTag* ops() { return ops_; } + Status* status_ptr() { return &status_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(Status s); + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } private: - grpc_core::CQCallbackInterface* impl_; - Status* status_; + grpc_call* call_; + std::function<void(Status)> func_; CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + status_ = Status(); // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; -class CallbackWithSuccessTag { +class CallbackWithSuccessTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -86,19 +122,40 @@ class CallbackWithSuccessTag { static void operator delete(void*, void*) { assert(0); } CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithSuccessTag::StaticRun; + } - void* tag() { return static_cast<void*>(impl_); } CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(bool ok); + void force_run(bool ok) { Run(ok); } private: - grpc_core::CQCallbackInterface* impl_; + grpc_call* call_; + std::function<void(bool)> func_; CompletionQueueTag* ops_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + bool new_ok = ok; + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index fc81c8aa0a..4d4faea063 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag->tag()); + ops->set_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1e0c36451b..56716493dc 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -128,6 +128,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, ""); } static const char* headers_strs[] = { + "functional", "grpcpp/impl/codegen/async_generic_service.h", "grpcpp/impl/codegen/async_stream.h", "grpcpp/impl/codegen/async_unary_call.h", @@ -547,6 +548,116 @@ void PrintHeaderClientMethod(grpc_generator::Printer* printer, } } +void PrintHeaderClientMethodCallbackInterfacesStart( + grpc_generator::Printer* printer, + std::map<grpc::string, grpc::string>* vars) { + // This declares the interface for the callback-based API. The components + // are pure; even though this is new (post-1.0) API, it can be pure because + // it is an entirely new interface that happens to be scoped within + // StubInterface, not new additions to StubInterface itself + printer->Print("class experimental_async_interface {\n"); + // All methods in this new interface are public. There is no need for private + // "Raw" methods since the callback-based API returns unowned raw pointers + printer->Print(" public:\n"); + printer->Indent(); + printer->Print("virtual ~experimental_async_interface() {}\n"); +} + +void PrintHeaderClientMethodCallbackInterfaces( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars, bool is_public) { + // Reserve is_public for future expansion + assert(is_public); + + (*vars)["Method"] = method->name(); + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + + if (method->NoStreaming()) { + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function<void(::grpc::Status)>) = 0;\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add support for client-side streaming + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add support for server-side streaming + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add support for bidi streaming + } +} + +void PrintHeaderClientMethodCallbackInterfacesEnd( + grpc_generator::Printer* printer, + std::map<grpc::string, grpc::string>* vars) { + printer->Outdent(); + printer->Print("};\n"); + + // Declare a function to give the async stub contents. It can't be pure + // since this is a new API in StubInterface, but it is meaningless by default + // (since any stub that wants to use it must have its own implementation of + // the callback functions therein), so make the default return value nullptr. + // Intentionally include the word "class" to avoid possible shadowing. + printer->Print( + "virtual class experimental_async_interface* experimental_async() { " + "return nullptr; }\n"); +} + +void PrintHeaderClientMethodCallbackStart( + grpc_generator::Printer* printer, + std::map<grpc::string, grpc::string>* vars) { + // This declares the stub entry for the callback-based API. + printer->Print("class experimental_async final :\n"); + printer->Print(" public StubInterface::experimental_async_interface {\n"); + printer->Print(" public:\n"); + printer->Indent(); +} + +void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer, + const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars, + bool is_public) { + // Reserve is_public for future expansion + assert(is_public); + + (*vars)["Method"] = method->name(); + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + + if (method->NoStreaming()) { + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function<void(::grpc::Status)>) override;\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add support for client-side streaming + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add support for server-side streaming + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add support for bidi streaming + } +} + +void PrintHeaderClientMethodCallbackEnd( + grpc_generator::Printer* printer, + std::map<grpc::string, grpc::string>* vars) { + printer->Outdent(); + printer->Print(" private:\n"); + printer->Indent(); + printer->Print("friend class Stub;\n"); + printer->Print("explicit experimental_async(Stub* stub): stub_(stub) { }\n"); + // include a function with a dummy use of stub_ to avoid an unused + // private member warning for service with no methods + printer->Print("Stub* stub() { return stub_; }\n"); + printer->Print("Stub* stub_;\n"); + printer->Outdent(); + printer->Print("};\n"); + + printer->Print( + "class experimental_async_interface* experimental_async() override { " + "return &async_stub_; }\n"); +} + void PrintHeaderClientMethodData(grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map<grpc::string, grpc::string>* vars) { @@ -951,6 +1062,14 @@ void PrintHeaderService(grpc_generator::Printer* printer, true); printer->Print(service->method(i)->GetTrailingComments("//").c_str()); } + PrintHeaderClientMethodCallbackInterfacesStart(printer, vars); + for (int i = 0; i < service->method_count(); ++i) { + printer->Print(service->method(i)->GetLeadingComments("//").c_str()); + PrintHeaderClientMethodCallbackInterfaces(printer, service->method(i).get(), + vars, true); + printer->Print(service->method(i)->GetTrailingComments("//").c_str()); + } + PrintHeaderClientMethodCallbackInterfacesEnd(printer, vars); printer->Outdent(); printer->Print("private:\n"); printer->Indent(); @@ -970,10 +1089,17 @@ void PrintHeaderService(grpc_generator::Printer* printer, for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } + PrintHeaderClientMethodCallbackStart(printer, vars); + for (int i = 0; i < service->method_count(); ++i) { + PrintHeaderClientMethodCallback(printer, service->method(i).get(), vars, + true); + } + PrintHeaderClientMethodCallbackEnd(printer, vars); printer->Outdent(); printer->Print("\n private:\n"); printer->Indent(); printer->Print("std::shared_ptr< ::grpc::ChannelInterface> channel_;\n"); + printer->Print("class experimental_async async_stub_{this};\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, false); } @@ -1199,10 +1325,12 @@ grpc::string GetSourceIncludes(grpc_generator::File* file, std::map<grpc::string, grpc::string> vars; static const char* headers_strs[] = { + "functional", "grpcpp/impl/codegen/async_stream.h", "grpcpp/impl/codegen/async_unary_call.h", "grpcpp/impl/codegen/channel_interface.h", "grpcpp/impl/codegen/client_unary_call.h", + "grpcpp/impl/codegen/client_callback.h", "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/rpc_service_method.h", "grpcpp/impl/codegen/service_type.h", @@ -1247,6 +1375,17 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, " return ::grpc::internal::BlockingUnaryCall" "(channel_.get(), rpcmethod_$Method$_, " "context, request, response);\n}\n\n"); + + printer->Print(*vars, + "void $ns$$Service$::Stub::experimental_async::$Method$(" + "::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function<void(::grpc::Status)> f) {\n"); + printer->Print(*vars, + " return ::grpc::internal::CallbackUnaryCall" + "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, " + "context, request, response, std::move(f));\n}\n\n"); + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1277,6 +1416,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context, response);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1308,6 +1450,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context, request);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1339,6 +1484,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index c2cf450e94..5dc9991f70 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); + void (*init)(void* data, + grpc_experimental_completion_queue_functor* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -267,7 +268,7 @@ typedef struct cq_callback_data { bool shutdown_called; /** A callback that gets invoked when the CQ completes shutdown */ - grpc_core::CQCallbackInterface* shutdown_callback; + grpc_experimental_completion_queue_functor* shutdown_callback; } cq_callback_data; /* Completion queue structure */ @@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_callback(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_callback( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); @@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback) { + grpc_experimental_completion_queue_functor* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_next_data* cqd = static_cast<cq_next_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) { } static void cq_init_callback( - void* data, grpc_core::CQCallbackInterface* shutdown_callback) { + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_callback_data* cqd = static_cast<cq_callback_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -859,7 +860,8 @@ static void cq_end_op_for_callback( GRPC_ERROR_UNREF(error); - (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success); + auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag); + (*functor->functor_run)(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->Run(true); + (*callback->functor_run)(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index a7c524d8e8..d60fe6d6ef 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -48,23 +48,6 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -/// For callback CQs, the tag that is passed in for an operation must -/// actually be a pointer to an implementation of the following class. -/// When the operation completes, the tag will be typecasted from void* -/// to grpc_core::CQCallbackInterface* and then the Run method will be -/// invoked on it. In practice, the language binding (e.g., C++ API -/// implementation) is responsible for providing and using an implementation -/// of this abstract base class. -namespace grpc_core { -class CQCallbackInterface { - public: - virtual ~CQCallbackInterface() {} - virtual void Run(bool) GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS -}; -} // namespace grpc_core - #ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, const char* file, int line); @@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc); grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback); + grpc_experimental_completion_queue_functor* shutdown_callback); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc index ed92dd7eba..2616c156e4 100644 --- a/src/core/lib/surface/completion_queue_factory.cc +++ b/src/core/lib/surface/completion_queue_factory.cc @@ -31,8 +31,7 @@ static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attr) { return grpc_completion_queue_create_internal( - attr->cq_completion_type, attr->cq_polling_type, - static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb)); + attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { } grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved) { + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = { 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ad71286e05..c59059f045 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, } namespace { -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } // TakeCQ takes ownership of the cq into the shutdown callback // so that the shutdown callback will be responsible for destroying it void TakeCQ(CompletionQueue* cq) { cq_ = cq; } // The Run function will get invoked by the completion queue library // when the shutdown is actually complete - void Run(bool) override { - delete cq_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + grpc_core::Delete(callback); } private: diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc deleted file mode 100644 index a0c8eeb516..0000000000 --- a/src/cpp/common/callback_common.cc +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <functional> - -#include <grpcpp/impl/codegen/callback_common.h> -#include <grpcpp/impl/codegen/status.h> - -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/surface/completion_queue.h" - -namespace grpc { -namespace internal { -namespace { - -template <class Func, class Arg> -void CatchingCallback(Func&& func, Arg&& arg) { -#if GRPC_ALLOW_EXCEPTIONS - try { - func(arg); - } catch (...) { - // nothing to return or change here, just don't crash the library - } -#else // GRPC_ALLOW_EXCEPTIONS - func(arg); -#endif // GRPC_ALLOW_EXCEPTIONS -} - -class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithSuccessImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, - std::function<void(bool)> f) - : call_(call), parent_(parent), func_(std::move(f)) { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - bool new_ok = ok; - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_, so ok to move it out for rvalue call above - CatchingCallback(std::move(func_), ok); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - - private: - grpc_call* call_; - CallbackWithSuccessTag* parent_; - std::function<void(bool)> func_; -}; - -class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithStatusImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, - std::function<void(Status)> f) - : call_(call), parent_(parent), func_(std::move(f)), status_() { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - Status* status_ptr() { return &status_; } - - private: - grpc_call* call_; - CallbackWithStatusTag* parent_; - std::function<void(Status)> func_; - Status status_; -}; - -} // namespace - -CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, - std::function<void(bool)> f, - CompletionQueueTag* ops) - : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) - CallbackWithSuccessImpl(call, this, std::move(f))), - ops_(ops) {} - -void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } - -CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, - std::function<void(Status)> f, - CompletionQueueTag* ops) - : ops_(ops) { - auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) - CallbackWithStatusImpl(call, this, std::move(f)); - impl_ = impl; - status_ = impl->status_ptr(); -} - -void CallbackWithStatusTag::force_run(Status s) { - *status_ = std::move(s); - impl_->Run(true); -} - -} // namespace internal -} // namespace grpc diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index d00e75c326..e25d953a18 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved); extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; #define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import -typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved); +typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved); extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import; #define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved); diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc index 0d6c7c75a8..310030046a 100644 --- a/test/core/end2end/inproc_callback_test.cc +++ b/test/core/end2end/inproc_callback_test.cc @@ -37,13 +37,16 @@ typedef struct inproc_fixture_data { namespace { template <typename F> -class CQDeletingCallback : public grpc_core::CQCallbackInterface { +class CQDeletingCallback : public grpc_experimental_completion_queue_functor { public: - explicit CQDeletingCallback(F f) : func_(f) {} - ~CQDeletingCallback() override {} - void Run(bool ok) override { - func_(ok); - grpc_core::Delete(this); + explicit CQDeletingCallback(F f) : func_(f) { + functor_run = &CQDeletingCallback::Run; + } + ~CQDeletingCallback() {} + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + auto* callback = static_cast<CQDeletingCallback*>(cb); + callback->func_(static_cast<bool>(ok)); + grpc_core::Delete(callback); } private: @@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface { }; template <typename F> -grpc_core::CQCallbackInterface* NewDeletingCallback(F f) { +grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) { return grpc_core::New<CQDeletingCallback<F>>(f); } -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: ShutdownCallback() : done_(false) { + functor_run = &ShutdownCallback::StaticRun; gpr_mu_init(&mu_); gpr_cv_init(&cv_); } - ~ShutdownCallback() override {} - void Run(bool ok) override { + ~ShutdownCallback() {} + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + auto* callback = static_cast<ShutdownCallback*>(cb); + callback->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { gpr_log(GPR_DEBUG, "CQ shutdown notification invoked"); gpr_mu_lock(&mu_); done_ = true; @@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) { // This function creates a callback functor that emits the // desired tag into the global tag set -static grpc_core::CQCallbackInterface* tag(intptr_t t) { +static grpc_experimental_completion_queue_functor* tag(intptr_t t) { auto func = [t](bool ok) { gpr_mu_lock(&tags_mu); gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index b889fd0fc6..f7ce8a7042 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -369,11 +369,15 @@ static void test_callback(void) { LOG_TEST("test_callback"); bool got_shutdown = false; - class ShutdownCallback : public grpc_core::CQCallbackInterface { + class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: - ShutdownCallback(bool* done) : done_(done) {} + ShutdownCallback(bool* done) : done_(done) { + functor_run = &ShutdownCallback::Run; + } ~ShutdownCallback() {} - void Run(bool ok) override { *done_ = ok; } + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok); + } private: bool* done_; @@ -391,14 +395,17 @@ static void test_callback(void) { grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); int counter = 0; - class TagCallback : public grpc_core::CQCallbackInterface { + class TagCallback : public grpc_experimental_completion_queue_functor { public: - TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {} + TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { + functor_run = &TagCallback::Run; + } ~TagCallback() {} - void Run(bool ok) override { - GPR_ASSERT(ok); - *counter_ += tag_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + GPR_ASSERT(static_cast<bool>(ok)); + auto* callback = static_cast<TagCallback*>(cb); + *callback->counter_ += callback->tag_; + grpc_core::Delete(callback); }; private: diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 756f6a0224..93e1e68654 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -26,6 +26,7 @@ #include "src/proto/grpc/testing/compiler_test.pb.h" +#include <functional> #include <grpcpp/impl/codegen/async_generic_service.h> #include <grpcpp/impl/codegen/async_stream.h> #include <grpcpp/impl/codegen/async_unary_call.h> @@ -105,6 +106,23 @@ class ServiceA final { return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); } // Method A4 trailing comment 1 + class experimental_async_interface { + public: + virtual ~experimental_async_interface() {} + // MethodA1 leading comment 1 + virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0; + // MethodA1 trailing comment 1 + // MethodA2 detached leading comment 1 + // + // Method A2 leading comment 1 + // Method A2 leading comment 2 + // MethodA2 trailing comment 1 + // Method A3 leading comment 1 + // Method A3 trailing comment 1 + // Method A4 leading comment 1 + // Method A4 trailing comment 1 + }; + virtual class experimental_async_interface* experimental_async() { return nullptr; } private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; @@ -155,9 +173,21 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); } + class experimental_async final : + public StubInterface::experimental_async_interface { + public: + void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override; + private: + friend class Stub; + explicit experimental_async(Stub* stub): stub_(stub) { } + Stub* stub() { return stub_; } + Stub* stub_; + }; + class experimental_async_interface* experimental_async() override { return &async_stub_; } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; + class experimental_async async_stub_{this}; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override; @@ -488,6 +518,14 @@ class ServiceB final { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); } // MethodB1 trailing comment 1 + class experimental_async_interface { + public: + virtual ~experimental_async_interface() {} + // MethodB1 leading comment 1 + virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0; + // MethodB1 trailing comment 1 + }; + virtual class experimental_async_interface* experimental_async() { return nullptr; } private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; @@ -502,9 +540,21 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); } + class experimental_async final : + public StubInterface::experimental_async_interface { + public: + void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override; + private: + friend class Stub; + explicit experimental_async(Stub* stub): stub_(stub) { } + Stub* stub() { return stub_; } + Stub* stub_; + }; + class experimental_async_interface* experimental_async() override { return &async_stub_; } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; + class experimental_async async_stub_{this}; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index d8cb44b694..62a85641c7 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -18,6 +18,7 @@ #include <functional> #include <mutex> +#include <thread> #include <grpcpp/channel.h> #include <grpcpp/client_context.h> @@ -55,7 +56,8 @@ class ClientCallbackEnd2endTest : public ::testing::Test { void ResetStub() { ChannelArguments args; channel_ = server_->InProcessChannel(args); - stub_.reset(new GenericStub(channel_)); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + generic_stub_.reset(new GenericStub(channel_)); } void TearDown() override { @@ -64,7 +66,45 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } - void SendRpcs(int num_rpcs, bool maybe_except) { + void SendRpcs(int num_rpcs, bool with_binary_metadata) { + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + + test_string += "Hello world. "; + request.set_message(test_string); + + if (with_binary_metadata) { + char bytes[8] = {'\0', '\1', '\2', '\3', + '\4', '\5', '\6', static_cast<char>(i)}; + cli_ctx.AddMetadata("custom-bin", grpc::string(bytes, 8)); + } + + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&request, &response, &done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + EXPECT_EQ(request.message(), response.message()); + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } + } + } + + void SendRpcsGeneric(int num_rpcs, bool maybe_except) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -80,7 +120,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test { std::mutex mu; std::condition_variable cv; bool done = false; - stub_->experimental().UnaryCall( + generic_stub_->experimental().UnaryCall( &cli_ctx, kMethodName, send_buf.get(), &recv_buf, [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) { GPR_ASSERT(s.ok()); @@ -105,9 +145,11 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } } + bool is_server_started_; std::shared_ptr<Channel> channel_; - std::unique_ptr<grpc::GenericStub> stub_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<grpc::GenericStub> generic_stub_; TestServiceImpl service_; std::unique_ptr<Server> server_; }; @@ -122,13 +164,72 @@ TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { SendRpcs(10, false); } +TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { + ResetStub(); + SendRpcs(10, true); +} + +TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { + ResetStub(); + SendRpcsGeneric(10, false); +} + #if GRPC_ALLOW_EXCEPTIONS TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); - SendRpcs(10, true); + SendRpcsGeneric(10, true); } #endif +TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + ResetStub(); + std::vector<std::thread> threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back([this] { SendRpcs(10, true); }); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { + ResetStub(); + std::vector<std::thread> threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back([this] { SendRpcs(10, false); }); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.TryCancel(); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &context, &request, &response, [&response, &done, &mu, &cv](Status s) { + EXPECT_EQ("", response.message()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock<std::mutex> l(mu); + while (!done) { + cv.wait(l); + } +} + } // namespace } // namespace testing } // namespace grpc diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index a72390d9f8..cfc5ef98e7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \ src/cpp/codegen/codegen_init.cc \ src/cpp/common/alarm.cc \ src/cpp/common/auth_property_iterator.cc \ -src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 96645d0bbf..3f7393ae94 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -11473,7 +11473,6 @@ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.h", |