diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/common/completion_queue.cc | 5 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 8 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 70 |
3 files changed, 69 insertions, 14 deletions
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index c330d21a46..f9bb8689b6 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -88,10 +88,11 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) { } } -void CompletionQueue::TryPluck(CompletionQueueTag* tag) { +void CompletionQueue::TryPluck(CompletionQueueTag* tag, bool forever) { std::unique_ptr<grpc_event, EventDeleter> ev; - ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past)); + ev.reset(grpc_completion_queue_pluck( + cq_, tag, forever ? gpr_inf_future : gpr_inf_past)); if (!ev) return; bool ok = ev->data.op_complete == GRPC_OP_OK; void* ignored = tag; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 8fffea640f..cf6b02293f 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -205,6 +205,7 @@ class Server::SyncRequest final : public CompletionQueueTag { if (has_response_payload_) { res.reset(method_->AllocateResponseProto()); } + ctx_.BeginCompletionOp(&call_); auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); CallOpBuffer buf; @@ -215,10 +216,12 @@ class Server::SyncRequest final : public CompletionQueueTag { buf.AddSendMessage(*res); } buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); - bool cancelled; - buf.AddServerRecvClose(&cancelled); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); + void* ignored_tag; + bool ignored_ok; + cq_.Shutdown(); + GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false); } private: @@ -332,6 +335,7 @@ class Server::AsyncRequest final : public CompletionQueueTag { } ctx_->call_ = call_; Call call(call_, server_, cq_); + ctx_->BeginCompletionOp(&call); // just the pointers inside call are copied here stream_->BindCall(&call); delete this; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index b9d85b95e9..9412f2762f 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -34,10 +34,59 @@ #include <grpc++/server_context.h> #include <grpc++/impl/call.h> #include <grpc/grpc.h> +#include <grpc/support/log.h> #include "src/cpp/util/time.h" namespace grpc { +// CompletionOp + +class ServerContext::CompletionOp final : public CallOpBuffer { + public: + CompletionOp(); + bool FinalizeResult(void** tag, bool* status) override; + + bool CheckCancelled(CompletionQueue* cq); + + void Unref(); + + private: + std::mutex mu_; + int refs_ = 2; // initial refs: one in the server context, one in the cq + bool finalized_ = false; + bool cancelled_ = false; +}; + +ServerContext::CompletionOp::CompletionOp() { AddServerRecvClose(&cancelled_); } + +void ServerContext::CompletionOp::Unref() { + std::unique_lock<std::mutex> lock(mu_); + if (--refs_ == 0) { + lock.unlock(); + delete this; + } +} + +bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this, false); + std::lock_guard<std::mutex> g(mu_); + return finalized_ ? cancelled_ : false; +} + +bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { + GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status)); + std::unique_lock<std::mutex> lock(mu_); + finalized_ = true; + if (!*status) cancelled_ = true; + if (--refs_ == 0) { + lock.unlock(); + delete this; + } + return false; +} + +// ServerContext body + ServerContext::ServerContext() {} ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata, @@ -55,6 +104,15 @@ ServerContext::~ServerContext() { if (call_) { grpc_call_destroy(call_); } + if (completion_op_) { + completion_op_->Unref(); + } +} + +void ServerContext::BeginCompletionOp(Call* call) { + GPR_ASSERT(!completion_op_); + completion_op_ = new CompletionOp(); + call->PerformOps(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, @@ -67,16 +125,8 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key, trailing_metadata_.insert(std::make_pair(key, value)); } -bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - std::lock_guard<std::mutex> g(mu_); - return finalized_ ? cancelled_ != 0 : false; -} - -bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - std::lock_guard<std::mutex> g(mu_); - finalized_ = true; - return false; +bool ServerContext::IsCancelled() { + return completion_op_ && completion_op_->CheckCancelled(cq_); } } // namespace grpc |