aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h19
-rw-r--r--include/grpcpp/impl/codegen/server_context.h6
-rw-r--r--src/cpp/server/server_cc.cc8
-rw-r--r--src/cpp/server/server_context.cc32
4 files changed, 46 insertions, 19 deletions
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index 8273ef2f4a..29deef658f 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -141,6 +141,9 @@ class CallbackWithSuccessTag
// that are detected before the operations are internally processed.
void force_run(bool ok) { Run(ok); }
+ /// check if this tag has ever been set
+ operator bool() const { return call_ != nullptr; }
+
private:
grpc_call* call_;
std::function<void(bool)> func_;
@@ -153,13 +156,19 @@ class CallbackWithSuccessTag
void Run(bool ok) {
void* ignored = ops_;
bool new_ok = ok;
- GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok));
+ // Allow a "false" return value from FinalizeResult to silence the
+ // callback, just as it silences a CQ tag in the async cases
+ bool do_callback = ops_->FinalizeResult(&ignored, &new_ok);
GPR_CODEGEN_ASSERT(ignored == ops_);
- // Last use of func_, so ok to move it out for rvalue call above
- auto func = std::move(func_);
- func_ = nullptr; // reset to clear this out for sure
- CatchingCallback(std::move(func), ok);
+ if (do_callback) {
+ // Last use of func_, so ok to move it out for rvalue call above
+ auto func = std::move(func_);
+ func_ = nullptr; // reset to clear this out for sure
+ CatchingCallback(std::move(func), ok);
+ } else {
+ func_ = nullptr; // reset to clear this out for sure
+ }
g_core_codegen_interface->grpc_call_unref(call_);
}
};
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index ecb9073cf9..82ee862f61 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -27,6 +27,7 @@
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/call_op_set.h>
+#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/create_auth_context.h>
@@ -139,7 +140,7 @@ class ServerContext {
/// must end in "-bin".
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
- /// IsCancelled is always safe to call when using sync API.
+ /// IsCancelled is always safe to call when using sync or callback API.
/// When using async API, it is only safe to call IsCancelled after
/// the AsyncNotifyWhenDone tag has been delivered.
bool IsCancelled() const;
@@ -281,7 +282,7 @@ class ServerContext {
class CompletionOp;
- void BeginCompletionOp(internal::Call* call);
+ void BeginCompletionOp(internal::Call* call, bool callback);
/// Return the tag queued by BeginCompletionOp()
internal::CompletionQueueTag* GetCompletionOpTag();
@@ -312,6 +313,7 @@ class ServerContext {
CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;
+ internal::CallbackWithSuccessTag completion_tag_;
gpr_timespec deadline_;
grpc_call* call_;
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 4ce2a46a09..a9f0315af6 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -279,7 +279,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
void ContinueRunAfterInterception() {
{
- ctx_.BeginCompletionOp(&call_);
+ ctx_.BeginCompletionOp(&call_, false);
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
@@ -444,7 +444,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
void ContinueRunAfterInterception() {
- // req_->ctx_.BeginCompletionOp(call_);
+ req_->ctx_.BeginCompletionOp(call_, true);
req_->method_->handler()->RunHandler(
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
@@ -994,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
}
if (*status && call_) {
- context_->BeginCompletionOp(&call_wrapper_);
+ context_->BeginCompletionOp(&call_wrapper_, false);
}
*tag = tag_;
if (delete_on_finalize_) {
@@ -1005,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
- context_->BeginCompletionOp(&call_wrapper_);
+ context_->BeginCompletionOp(&call_wrapper_, false);
// Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx;
grpc_cq_begin_op(notification_cq_->cq(), this);
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 51a2689c6a..8ded7a8972 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
: call_(*call),
has_tag_(false),
tag_(nullptr),
+ cq_tag_(this),
refs_(2),
finalized_(false),
cancelled_(0),
done_intercepting_(false) {}
+ // CompletionOp isn't copyable or movable
+ CompletionOp(const CompletionOp&) = delete;
+ CompletionOp& operator=(const CompletionOp&) = delete;
+ CompletionOp(CompletionOp&&) = delete;
+ CompletionOp& operator=(CompletionOp&&) = delete;
+
~CompletionOp() {
if (call_.server_rpc_info()) {
call_.server_rpc_info()->Unref();
@@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
- /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
- void* cq_tag() override { return this; }
+ void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
+
+ void* cq_tag() override { return cq_tag_; }
void Unref();
@@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
internal::Call call_;
bool has_tag_;
void* tag_;
+ void* cq_tag_;
std::mutex mu_;
int refs_;
bool finalized_;
@@ -158,7 +167,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
interceptor_methods_.SetReverse();
interceptor_methods_.SetCallOpSetInterface(this);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), &ops, 1, this, nullptr));
+ grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr));
/* No interceptors to run here */
}
@@ -251,7 +260,7 @@ void ServerContext::Clear() {
// either called from destructor or just before Setup
}
-void ServerContext::BeginCompletionOp(internal::Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
@@ -260,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) {
completion_op_ =
new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
CompletionOp(call);
- if (has_notify_when_done_tag_) {
+ if (callback) {
+ completion_tag_ =
+ internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_);
+ completion_op_->set_cq_tag(&completion_tag_);
+ } else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
@@ -289,12 +302,15 @@ void ServerContext::TryCancel() const {
}
bool ServerContext::IsCancelled() const {
- if (has_notify_when_done_tag_) {
- // when using async API, but the result is only valid
+ if (completion_tag_) {
+ // When using callback API, this result is always valid.
+ return completion_op_->CheckCancelledAsync();
+ } else if (completion_tag_ || has_notify_when_done_tag_) {
+ // When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
- // when using sync API
+ // when using sync API, the result is always valid
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
}