aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-10-30 23:13:11 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-10-30 23:15:59 -0700
commit2f47137a6e4dc05df4e09e47348aadbe21889d13 (patch)
tree7e1ef0a781f1298aec042aa30eca7451097f6e6d /src/cpp
parentb460622c2d8be2bddcb60cf0682d1b88d9c2357e (diff)
Add support for IsCancelled check
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/server/server_cc.cc8
-rw-r--r--src/cpp/server/server_context.cc32
2 files changed, 28 insertions, 12 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 4ce2a46a09..a9f0315af6 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -279,7 +279,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
void ContinueRunAfterInterception() {
{
- ctx_.BeginCompletionOp(&call_);
+ ctx_.BeginCompletionOp(&call_, false);
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
@@ -444,7 +444,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
void ContinueRunAfterInterception() {
- // req_->ctx_.BeginCompletionOp(call_);
+ req_->ctx_.BeginCompletionOp(call_, true);
req_->method_->handler()->RunHandler(
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
@@ -994,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
}
if (*status && call_) {
- context_->BeginCompletionOp(&call_wrapper_);
+ context_->BeginCompletionOp(&call_wrapper_, false);
}
*tag = tag_;
if (delete_on_finalize_) {
@@ -1005,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
- context_->BeginCompletionOp(&call_wrapper_);
+ context_->BeginCompletionOp(&call_wrapper_, false);
// Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx;
grpc_cq_begin_op(notification_cq_->cq(), this);
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 51a2689c6a..8ded7a8972 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
: call_(*call),
has_tag_(false),
tag_(nullptr),
+ cq_tag_(this),
refs_(2),
finalized_(false),
cancelled_(0),
done_intercepting_(false) {}
+ // CompletionOp isn't copyable or movable
+ CompletionOp(const CompletionOp&) = delete;
+ CompletionOp& operator=(const CompletionOp&) = delete;
+ CompletionOp(CompletionOp&&) = delete;
+ CompletionOp& operator=(CompletionOp&&) = delete;
+
~CompletionOp() {
if (call_.server_rpc_info()) {
call_.server_rpc_info()->Unref();
@@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
- /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API
- void* cq_tag() override { return this; }
+ void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
+
+ void* cq_tag() override { return cq_tag_; }
void Unref();
@@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
internal::Call call_;
bool has_tag_;
void* tag_;
+ void* cq_tag_;
std::mutex mu_;
int refs_;
bool finalized_;
@@ -158,7 +167,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
interceptor_methods_.SetReverse();
interceptor_methods_.SetCallOpSetInterface(this);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), &ops, 1, this, nullptr));
+ grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr));
/* No interceptors to run here */
}
@@ -251,7 +260,7 @@ void ServerContext::Clear() {
// either called from destructor or just before Setup
}
-void ServerContext::BeginCompletionOp(internal::Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
@@ -260,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) {
completion_op_ =
new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
CompletionOp(call);
- if (has_notify_when_done_tag_) {
+ if (callback) {
+ completion_tag_ =
+ internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_);
+ completion_op_->set_cq_tag(&completion_tag_);
+ } else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
call->PerformOps(completion_op_);
@@ -289,12 +302,15 @@ void ServerContext::TryCancel() const {
}
bool ServerContext::IsCancelled() const {
- if (has_notify_when_done_tag_) {
- // when using async API, but the result is only valid
+ if (completion_tag_) {
+ // When using callback API, this result is always valid.
+ return completion_op_->CheckCancelledAsync();
+ } else if (completion_tag_ || has_notify_when_done_tag_) {
+ // When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
- // when using sync API
+ // when using sync API, the result is always valid
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
}