aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cpp/common/completion_queue.cc5
-rw-r--r--src/cpp/server/server.cc8
-rw-r--r--src/cpp/server/server_context.cc70
3 files changed, 69 insertions, 14 deletions
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index c330d21a46..f9bb8689b6 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -88,10 +88,11 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
}
}
-void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
+void CompletionQueue::TryPluck(CompletionQueueTag* tag, bool forever) {
std::unique_ptr<grpc_event, EventDeleter> ev;
- ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past));
+ ev.reset(grpc_completion_queue_pluck(
+ cq_, tag, forever ? gpr_inf_future : gpr_inf_past));
if (!ev) return;
bool ok = ev->data.op_complete == GRPC_OP_OK;
void* ignored = tag;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 8fffea640f..cf6b02293f 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -205,6 +205,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
}
+ ctx_.BeginCompletionOp(&call_);
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf;
@@ -215,10 +216,12 @@ class Server::SyncRequest final : public CompletionQueueTag {
buf.AddSendMessage(*res);
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- bool cancelled;
- buf.AddServerRecvClose(&cancelled);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
+ void* ignored_tag;
+ bool ignored_ok;
+ cq_.Shutdown();
+ GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
}
private:
@@ -332,6 +335,7 @@ class Server::AsyncRequest final : public CompletionQueueTag {
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
+ ctx_->BeginCompletionOp(&call);
// just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index b9d85b95e9..9412f2762f 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -34,10 +34,59 @@
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc/grpc.h>
+#include <grpc/support/log.h>
#include "src/cpp/util/time.h"
namespace grpc {
+// CompletionOp
+
+class ServerContext::CompletionOp final : public CallOpBuffer {
+ public:
+ CompletionOp();
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ bool CheckCancelled(CompletionQueue* cq);
+
+ void Unref();
+
+ private:
+ std::mutex mu_;
+ int refs_ = 2; // initial refs: one in the server context, one in the cq
+ bool finalized_ = false;
+ bool cancelled_ = false;
+};
+
+ServerContext::CompletionOp::CompletionOp() { AddServerRecvClose(&cancelled_); }
+
+void ServerContext::CompletionOp::Unref() {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (--refs_ == 0) {
+ lock.unlock();
+ delete this;
+ }
+}
+
+bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
+ cq->TryPluck(this, false);
+ std::lock_guard<std::mutex> g(mu_);
+ return finalized_ ? cancelled_ : false;
+}
+
+bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
+ GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
+ std::unique_lock<std::mutex> lock(mu_);
+ finalized_ = true;
+ if (!*status) cancelled_ = true;
+ if (--refs_ == 0) {
+ lock.unlock();
+ delete this;
+ }
+ return false;
+}
+
+// ServerContext body
+
ServerContext::ServerContext() {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
@@ -55,6 +104,15 @@ ServerContext::~ServerContext() {
if (call_) {
grpc_call_destroy(call_);
}
+ if (completion_op_) {
+ completion_op_->Unref();
+ }
+}
+
+void ServerContext::BeginCompletionOp(Call* call) {
+ GPR_ASSERT(!completion_op_);
+ completion_op_ = new CompletionOp();
+ call->PerformOps(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
@@ -67,16 +125,8 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
trailing_metadata_.insert(std::make_pair(key, value));
}
-bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
- cq->TryPluck(this);
- std::lock_guard<std::mutex> g(mu_);
- return finalized_ ? cancelled_ != 0 : false;
-}
-
-bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- std::lock_guard<std::mutex> g(mu_);
- finalized_ = true;
- return false;
+bool ServerContext::IsCancelled() {
+ return completion_op_ && completion_op_->CheckCancelled(cq_);
}
} // namespace grpc