aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_context.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_context.cc')
-rw-r--r--src/cpp/server/server_context.cc47
1 files changed, 32 insertions, 15 deletions
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 9c01f896e6..1b524bc3e8 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -17,6 +17,7 @@
*/
#include <grpcpp/server_context.h>
+#include <grpcpp/support/server_callback.h>
#include <algorithm>
#include <mutex>
@@ -41,8 +42,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
// must ref the call before calling constructor and after deleting this
- CompletionOp(internal::Call* call)
+ CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
: call_(*call),
+ reactor_(reactor),
has_tag_(false),
tag_(nullptr),
core_cq_tag_(this),
@@ -124,9 +126,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
return;
}
/* Start a dummy op so that we can return the tag */
- GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
- g_core_codegen_interface->grpc_call_start_batch(
- call_.call(), nullptr, 0, this, nullptr));
+ GPR_CODEGEN_ASSERT(
+ GRPC_CALL_OK ==
+ grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
}
private:
@@ -136,13 +138,14 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
internal::Call call_;
+ internal::ServerReactor* reactor_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
std::mutex mu_;
int refs_;
bool finalized_;
- int cancelled_;
+ int cancelled_; // This is an int (not bool) because it is passed to core
bool done_intercepting_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
@@ -190,7 +193,16 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
finalized_ = true;
- if (!*status) cancelled_ = 1;
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
+ }
+
+ if (cancelled_ && (reactor_ != nullptr)) {
+ reactor_->OnCancel();
+ }
/* Release the lock since we are going to be running through interceptors now
*/
lock.unlock();
@@ -251,21 +263,25 @@ void ServerContext::Clear() {
initial_metadata_.clear();
trailing_metadata_.clear();
client_metadata_.Reset();
- if (call_) {
- grpc_call_unref(call_);
- }
if (completion_op_) {
completion_op_->Unref();
+ completion_op_ = nullptr;
completion_tag_.Clear();
}
if (rpc_info_) {
rpc_info_->Unref();
+ rpc_info_ = nullptr;
+ }
+ if (call_) {
+ auto* call = call_;
+ call_ = nullptr;
+ grpc_call_unref(call);
}
- // Don't need to clear out call_, completion_op_, or rpc_info_ because this is
- // either called from destructor or just before Setup
}
-void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
+void ServerContext::BeginCompletionOp(internal::Call* call,
+ std::function<void(bool)> callback,
+ internal::ServerReactor* reactor) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
@@ -273,10 +289,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
grpc_call_ref(call->call());
completion_op_ =
new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
- CompletionOp(call);
- if (callback) {
- completion_tag_.Set(call->call(), nullptr, completion_op_);
+ CompletionOp(call, reactor);
+ if (callback != nullptr) {
+ completion_tag_.Set(call->call(), std::move(callback), completion_op_);
completion_op_->set_core_cq_tag(&completion_tag_);
+ completion_op_->set_tag(completion_op_);
} else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}