diff options
Diffstat (limited to 'src/cpp/server/server_context.cc')
-rw-r--r-- | src/cpp/server/server_context.cc | 235 |
1 files changed, 186 insertions, 49 deletions
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 6f5bde0d9f..9c01f896e6 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -40,14 +40,45 @@ namespace grpc { class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() - : has_tag_(false), + // must ref the call before calling constructor and after deleting this + CompletionOp(internal::Call* call) + : call_(*call), + has_tag_(false), tag_(nullptr), + core_cq_tag_(this), refs_(2), finalized_(false), - cancelled_(0) {} + cancelled_(0), + done_intercepting_(false) {} + + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + + ~CompletionOp() { + if (call_.server_rpc_info()) { + call_.server_rpc_info()->Unref(); + } + } + + void FillOps(internal::Call* call) override; + + // This should always be arena allocated in the call, so override delete. + // But this class is not trivially destructible, so must actually call delete + // before allowing the arena to be freed + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CompletionOp)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } - void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override; bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -61,95 +92,192 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } + void Unref(); + // This will be called while interceptors are run if the RPC is a hijacked + // RPC. This should set hijacking state for each of the ops. + void SetHijackingState() override { + /* Servers don't allow hijacking */ + GPR_CODEGEN_ASSERT(false); + } + + /* Should be called after interceptors are done running */ + void ContinueFillOpsAfterInterception() override {} + + /* Should be called after interceptors are done running on the finalize result + * path */ + void ContinueFinalizeResultAfterInterception() override { + done_intercepting_ = true; + if (!has_tag_) { + /* We don't have a tag to return. */ + std::unique_lock<std::mutex> lock(mu_); + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return; + } + /* Start a dummy op so that we can return the tag */ + GPR_CODEGEN_ASSERT(GRPC_CALL_OK == + g_core_codegen_interface->grpc_call_start_batch( + call_.call(), nullptr, 0, this, nullptr)); + } + private: bool CheckCancelledNoPluck() { std::lock_guard<std::mutex> g(mu_); return finalized_ ? (cancelled_ != 0) : false; } + internal::Call call_; bool has_tag_; void* tag_; + void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; int cancelled_; + bool done_intercepting_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; }; void ServerContext::CompletionOp::Unref() { std::unique_lock<std::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); + grpc_call* call = call_.call(); delete this; + grpc_call_unref(call); } } -void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops, - size_t* nops) { - ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops->data.recv_close_on_server.cancelled = &cancelled_; - ops->flags = 0; - ops->reserved = nullptr; - *nops = 1; +void ServerContext::CompletionOp::FillOps(internal::Call* call) { + grpc_op ops; + ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops.data.recv_close_on_server.cancelled = &cancelled_; + ops.flags = 0; + ops.reserved = nullptr; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + interceptor_methods_.SetCallOpSetInterface(this); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1, + core_cq_tag_, nullptr)); + /* No interceptors to run here */ } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - std::unique_lock<std::mutex> lock(mu_); - finalized_ = true; bool ret = false; - if (has_tag_) { - *tag = tag_; - ret = true; + std::unique_lock<std::mutex> lock(mu_); + if (done_intercepting_) { + /* We are done intercepting. */ + if (has_tag_) { + *tag = tag_; + ret = true; + } + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return ret; } + finalized_ = true; + if (!*status) cancelled_ = 1; - if (--refs_ == 0) { - lock.unlock(); - delete this; + /* Release the lock since we are going to be running through interceptors now + */ + lock.unlock(); + /* Add interception point and run through interceptors */ + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_CLOSE); + if (interceptor_methods_.RunInterceptors()) { + /* No interceptors were run */ + if (has_tag_) { + *tag = tag_; + ret = true; + } + lock.lock(); + if (--refs_ == 0) { + lock.unlock(); + grpc_call* call = call_.call(); + delete this; + grpc_call_unref(call); + } + return ret; } - return ret; + /* There are interceptors to be run. Return false for now */ + return false; } // ServerContext body -ServerContext::ServerContext() - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) {} - -ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(deadline), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) { +ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); } + +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) { + Setup(deadline); + std::swap(*client_metadata_.arr(), *arr); +} + +void ServerContext::Setup(gpr_timespec deadline) { + completion_op_ = nullptr; + has_notify_when_done_tag_ = false; + async_notify_when_done_tag_ = nullptr; + deadline_ = deadline; + call_ = nullptr; + cq_ = nullptr; + sent_initial_metadata_ = false; + compression_level_set_ = false; + has_pending_ops_ = false; + rpc_info_ = nullptr; +} + +void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { + deadline_ = deadline; std::swap(*client_metadata_.arr(), *arr); - client_metadata_.FillMap(); } -ServerContext::~ServerContext() { +ServerContext::~ServerContext() { Clear(); } + +void ServerContext::Clear() { + auth_context_.reset(); + initial_metadata_.clear(); + trailing_metadata_.clear(); + client_metadata_.Reset(); if (call_) { grpc_call_unref(call_); } if (completion_op_) { completion_op_->Unref(); + completion_tag_.Clear(); + } + if (rpc_info_) { + rpc_info_->Unref(); } + // Don't need to clear out call_, completion_op_, or rpc_info_ because this is + // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); - completion_op_ = new CompletionOp(); - if (has_notify_when_done_tag_) { + if (rpc_info_) { + rpc_info_->Ref(); + } + grpc_call_ref(call->call()); + completion_op_ = + new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) + CompletionOp(call); + if (callback) { + completion_tag_.Set(call->call(), nullptr, completion_op_); + completion_op_->set_core_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -170,6 +298,12 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key, } void ServerContext::TryCancel() const { + internal::CancelInterceptorBatchMethods cancel_methods; + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status( call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); if (err != GRPC_CALL_OK) { @@ -178,12 +312,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } |