diff options
author | Vijay Pai <vpai@google.com> | 2018-11-06 14:40:08 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-06 14:40:08 -0800 |
commit | 385fde64fc2e3b0241229e4a018ea0989a2a00bc (patch) | |
tree | 4d1fc0715c85961938e15e3311fd7cd286723680 | |
parent | 572da21de86dc71e455eba5d56809e99cf3dd6d3 (diff) | |
parent | 1e850944e91a30a4985a90d394c82f9e8626e1a0 (diff) |
Merge pull request #17105 from vjpai/callback_success
Minor refactor of CallbackWithSuccessTag
-rw-r--r-- | include/grpcpp/impl/codegen/callback_common.h | 43 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_callback.h | 6 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 4 |
3 files changed, 37 insertions, 16 deletions
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 29deef658f..51367cf550 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -110,6 +110,9 @@ class CallbackWithStatusTag } }; +/// CallbackWithSuccessTag can be reused multiple times, and will be used in +/// this fashion for streaming operations. As a result, it shouldn't clear +/// anything up until its destructor class CallbackWithSuccessTag : public grpc_experimental_completion_queue_functor { public: @@ -125,15 +128,39 @@ class CallbackWithSuccessTag // there are no tests catching the compiler warning. static void operator delete(void*, void*) { assert(0); } - CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {} + CallbackWithSuccessTag() : call_(nullptr) {} CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, - CompletionQueueTag* ops) - : call_(call), func_(std::move(f)), ops_(ops) { + CompletionQueueTag* ops) { + Set(call, f, ops); + } + + CallbackWithSuccessTag(const CallbackWithSuccessTag&) = delete; + CallbackWithSuccessTag& operator=(const CallbackWithSuccessTag&) = delete; + + ~CallbackWithSuccessTag() { Clear(); } + + // Set can only be called on a default-constructed or Clear'ed tag. + // It should never be called on a tag that was constructed with arguments + // or on a tag that has been Set before unless the tag has been cleared. + void Set(grpc_call* call, std::function<void(bool)> f, + CompletionQueueTag* ops) { + call_ = call; + func_ = std::move(f); + ops_ = ops; g_core_codegen_interface->grpc_call_ref(call); functor_run = &CallbackWithSuccessTag::StaticRun; } + void Clear() { + if (call_ != nullptr) { + func_ = nullptr; + grpc_call* call = call_; + call_ = nullptr; + g_core_codegen_interface->grpc_call_unref(call); + } + } + CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag @@ -141,7 +168,7 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } - /// check if this tag has ever been set + /// check if this tag is currently set operator bool() const { return call_ != nullptr; } private: @@ -162,14 +189,8 @@ class CallbackWithSuccessTag GPR_CODEGEN_ASSERT(ignored == ops_); if (do_callback) { - // Last use of func_, so ok to move it out for rvalue call above - auto func = std::move(func_); - func_ = nullptr; // reset to clear this out for sure - CatchingCallback(std::move(func), ok); - } else { - func_ = nullptr; // reset to clear this out for sure + CatchingCallback(func_, ok); } - g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 5d56cbf1df..b866fc16dc 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -22,6 +22,7 @@ #include <functional> #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/callback_common.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> @@ -116,7 +117,7 @@ class CallbackUnaryHandler : public MethodHandler { : public experimental::ServerCallbackRpcController { public: void Finish(Status s) override { - finish_tag_ = CallbackWithSuccessTag( + finish_tag_.Set( call_.call(), [this](bool) { grpc_call* call = call_.call(); @@ -149,8 +150,7 @@ class CallbackUnaryHandler : public MethodHandler { void SendInitialMetadata(std::function<void(bool)> f) override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - meta_tag_ = - CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_tag_.Set(call_.call(), std::move(f), &meta_buf_); meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 355debb3fb..19e6cfb0b4 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -252,6 +252,7 @@ void ServerContext::Clear() { } if (completion_op_) { completion_op_->Unref(); + completion_tag_.Clear(); } if (rpc_info_) { rpc_info_->Unref(); @@ -270,8 +271,7 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); if (callback) { - completion_tag_ = - internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_tag_.Set(call->call(), nullptr, completion_op_); completion_op_->set_core_cq_tag(&completion_tag_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); |