diff options
author | Vijay Pai <vpai@google.com> | 2018-09-19 23:51:24 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-09-19 23:57:56 -0700 |
commit | 9e6511ae2eb3c982bcea88096cbe079147b25fa4 (patch) | |
tree | 85d85313559030d1f97ec1d594855e5c8e8b25a8 /include/grpcpp | |
parent | 830e5ad5df749a5bbcb0d6bace79b7f3fe306a9e (diff) |
Make the core callback interface API so that it can be used in generated code
Diffstat (limited to 'include/grpcpp')
-rw-r--r-- | include/grpcpp/impl/codegen/callback_common.h | 92 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 2 |
2 files changed, 75 insertions, 19 deletions
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8b3ad66a8d..044045034c 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -21,25 +21,36 @@ #include <functional> +#include <grpc/impl/codegen/grpc_types.h> #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/status.h> -// Forward declarations -namespace grpc_core { -class CQCallbackInterface; -}; - namespace grpc { namespace internal { +/// An exception-safe way of invoking a user-specified callback function +template <class Func, class Arg> +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. -class CallbackWithStatusTag { +class CallbackWithStatusTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -54,24 +65,48 @@ class CallbackWithStatusTag { static void operator delete(void*, void*) { assert(0); } CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops), status_() { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + } ~CallbackWithStatusTag() {} - void* tag() { return static_cast<void*>(impl_); } - Status* status_ptr() { return status_; } - CompletionQueueTag* ops() { return ops_; } + Status* status_ptr() { return &status_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(Status s); + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } private: - grpc_core::CQCallbackInterface* impl_; - Status* status_; + grpc_call* call_; + std::function<void(Status)> func_; CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; -class CallbackWithSuccessTag { +class CallbackWithSuccessTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -86,19 +121,40 @@ class CallbackWithSuccessTag { static void operator delete(void*, void*) { assert(0); } CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithSuccessTag::StaticRun; + } - void* tag() { return static_cast<void*>(impl_); } CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(bool ok); + void force_run(bool ok) { Run(ok); } private: - grpc_core::CQCallbackInterface* impl_; + grpc_call* call_; + std::function<void(bool)> func_; CompletionQueueTag* ops_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + bool new_ok = ok; + GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index fc81c8aa0a..4d4faea063 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag->tag()); + ops->set_cq_tag(tag); call.PerformOps(ops); } }; |