From 2f47137a6e4dc05df4e09e47348aadbe21889d13 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 30 Oct 2018 23:13:11 -0700 Subject: Add support for IsCancelled check --- src/cpp/server/server_cc.cc | 8 ++++---- src/cpp/server/server_context.cc | 32 ++++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 12 deletions(-) (limited to 'src/cpp') diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 4ce2a46a09..a9f0315af6 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -279,7 +279,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_); + ctx_.BeginCompletionOp(&call_, false); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); @@ -444,7 +444,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - // req_->ctx_.BeginCompletionOp(call_); + req_->ctx_.BeginCompletionOp(call_, true); req_->method_->handler()->RunHandler( internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, @@ -994,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); } *tag = tag_; if (delete_on_finalize_) { @@ -1005,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 51a2689c6a..8ded7a8972 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), + cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), done_intercepting_(false) {} + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + ~CompletionOp() { if (call_.server_rpc_info()) { call_.server_rpc_info()->Unref(); @@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API - void* cq_tag() override { return this; } + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + + void* cq_tag() override { return cq_tag_; } void Unref(); @@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; + void* cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -158,7 +167,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, this, nullptr)); + grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -251,7 +260,7 @@ void ServerContext::Clear() { // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -260,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); - if (has_notify_when_done_tag_) { + if (callback) { + completion_tag_ = + internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_op_->set_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -289,12 +302,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (completion_tag_ || has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } -- cgit v1.2.3