From 84e763f10a1e10d36c7de35970f9d25958ee2e16 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 8 Oct 2018 13:28:10 -0700 Subject: Experimental C++ server callback unary API --- src/cpp/server/server_builder.cc | 39 ++++-- src/cpp/server/server_cc.cc | 251 +++++++++++++++++++++++++++++++++++++-- src/cpp/server/server_context.cc | 50 ++++---- 3 files changed, 298 insertions(+), 42 deletions(-) (limited to 'src/cpp/server') diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index fc42b6c886..0dc03b6876 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -71,7 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { ServerCompletionQueue* cq = new ServerCompletionQueue( - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr(cq); } @@ -256,15 +258,22 @@ std::unique_ptr ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type)); + sync_server_cqs->emplace_back( + new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - std::unique_ptr server(new Server( - max_receive_message_size_, &args, sync_server_cqs, - sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, resource_quota_, - std::move(interceptor_creators_))); + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_callback_methods()) { + has_callback_methods = true; + break; + } + } + + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods if (has_sync_methods) { // This is a Sync server @@ -276,6 +285,16 @@ std::unique_ptr ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + + std::unique_ptr server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec, resource_quota_, + std::move(interceptor_creators_))); + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -289,6 +308,12 @@ std::unique_ptr ServerBuilder::BuildAndStart() { num_frequently_polled_cqs++; } + if (has_callback_methods) { + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + num_frequently_polled_cqs++; + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 82a9d719fa..5d24e01a47 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -147,9 +147,9 @@ class Server::UnimplementedAsyncResponse final class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(internal::RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* method_tag) : method_(method), - tag_(tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_( method->method_type() == internal::RpcMethod::NORMAL_RPC || @@ -176,10 +176,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (tag_) { + if (method_tag_) { if (GRPC_CALL_OK != grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, notify_cq, this)) { TeardownRequest(); @@ -211,6 +211,9 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { return true; } + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) @@ -281,7 +284,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_)); + &call_, &ctx_, request_, request_status_, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); @@ -314,7 +317,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { private: internal::RpcServiceMethod* const method_; - void* const tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -325,6 +328,176 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::CallbackRequest final : public internal::CompletionQueueTag { + public: + CallbackRequest(Server* server, internal::RpcServiceMethod* method, + void* method_tag) + : server_(server), + method_(method), + method_tag_(method_tag), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), + cq_(server->CallbackCQ()), + tag_(this) { + Setup(); + } + + ~CallbackRequest() { Clear(); } + + void Request() { + if (method_tag_) { + if (GRPC_CALL_OK != + grpc_server_request_registered_call( + server_->c_server(), method_tag_, &call_, &deadline_, + &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(), + cq_->cq(), static_cast(&tag_))) { + return; + } + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + if (grpc_server_request_call(server_->c_server(), &call_, call_details_, + &request_metadata_, cq_->cq(), cq_->cq(), + static_cast(&tag_)) != GRPC_CALL_OK) { + return; + } + } + } + + bool FinalizeResult(void** tag, bool* status) override { return false; } + + private: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: + CallbackCallTag(Server::CallbackRequest* req) : req_(req) { + functor_run = &CallbackCallTag::StaticRun; + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: + Server::CallbackRequest* req_; + internal::Call* call_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast(cb)->Run(static_cast(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown + req_->Clear(); + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call + call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call))) + internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_->name(), req_->server_->interceptor_creators_)); + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( + req_->call_, req_->request_payload_, &req_->request_status_); + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + req_->interceptor_methods_.SetRecvMessage(req_->request_); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { + // req_->ctx_.BeginCompletionOp(call_); + req_->method_->handler()->RunHandler( + internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, + [this] { + req_->Reset(); + req_->Request(); + })); + } + }; + + void Reset() { + Clear(); + Setup(); + } + + void Clear() { + if (call_details_) { + delete call_details_; + call_details_ = nullptr; + } + grpc_metadata_array_destroy(&request_metadata_); + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + ctx_.Clear(); + interceptor_methods_.ClearState(); + } + + void Setup() { + grpc_metadata_array_init(&request_metadata_); + ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); + request_payload_ = nullptr; + request_ = nullptr; + request_status_ = Status(); + } + + Server* const server_; + internal::RpcServiceMethod* const method_; + void* const method_tag_; + const bool has_request_payload_; + grpc_byte_buffer* request_payload_; + void* request_; + Status request_status_; + grpc_call_details* call_details_ = nullptr; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; + CompletionQueue* cq_; + CallbackCallTag tag_; + ServerContext ctx_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; +}; + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers @@ -504,6 +677,9 @@ Server::Server( Server::~Server() { { std::unique_lock lock(mu_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } if (started_ && !shutdown_) { lock.unlock(); Shutdown(); @@ -576,21 +752,28 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } internal::RpcServiceMethod* method = it->get(); - void* tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); - if (tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method - method->set_server_tag(tag); - } else { + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == + internal::RpcServiceMethod::ApiType::SYNC) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->AddSyncMethod(method, tag); + (*it)->AddSyncMethod(method, method_registration_tag); } + } else { + // a callback method + auto* req = new CallbackRequest(this, method, method_registration_tag); + callback_reqs_.emplace_back(req); + // Enqueue it so that it will be Request'ed later once + // all request matchers are created at core server startup } method_name = method->name(); @@ -641,7 +824,8 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING); + health_check_cq = + new ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = @@ -678,6 +862,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } + for (auto& cbreq : callback_reqs_) { + cbreq->Request(); + } + if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } @@ -910,4 +1098,41 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( ServerInitializer* Server::initializer() { return server_initializer_.get(); } +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast(cb); + delete callback->cq_; + grpc_core::Delete(callback); + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Server::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered + std::lock_guard l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = grpc_core::New(); + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +}; + } // namespace grpc diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 995e787785..51a2689c6a 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -209,31 +209,35 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { // ServerContext body -ServerContext::ServerContext() - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) {} - -ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(deadline), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) { +ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); } + +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) { + Setup(deadline); std::swap(*client_metadata_.arr(), *arr); } -ServerContext::~ServerContext() { +void ServerContext::Setup(gpr_timespec deadline) { + completion_op_ = nullptr; + has_notify_when_done_tag_ = false; + async_notify_when_done_tag_ = nullptr; + deadline_ = deadline; + call_ = nullptr; + cq_ = nullptr; + sent_initial_metadata_ = false; + compression_level_set_ = false; + has_pending_ops_ = false; + rpc_info_ = nullptr; +} + +void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { + deadline_ = deadline; + std::swap(*client_metadata_.arr(), *arr); +} + +ServerContext::~ServerContext() { Clear(); } + +void ServerContext::Clear() { if (call_) { grpc_call_unref(call_); } @@ -243,6 +247,8 @@ ServerContext::~ServerContext() { if (rpc_info_) { rpc_info_->Unref(); } + // Don't need to clear out call_, completion_op_, or rpc_info_ because this is + // either called from destructor or just before Setup } void ServerContext::BeginCompletionOp(internal::Call* call) { -- cgit v1.2.3 From 2f47137a6e4dc05df4e09e47348aadbe21889d13 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 30 Oct 2018 23:13:11 -0700 Subject: Add support for IsCancelled check --- include/grpcpp/impl/codegen/callback_common.h | 19 +++++++++++----- include/grpcpp/impl/codegen/server_context.h | 6 +++-- src/cpp/server/server_cc.cc | 8 +++---- src/cpp/server/server_context.cc | 32 ++++++++++++++++++++------- 4 files changed, 46 insertions(+), 19 deletions(-) (limited to 'src/cpp/server') diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8273ef2f4a..29deef658f 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -141,6 +141,9 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } + /// check if this tag has ever been set + operator bool() const { return call_ != nullptr; } + private: grpc_call* call_; std::function func_; @@ -153,13 +156,19 @@ class CallbackWithSuccessTag void Run(bool ok) { void* ignored = ops_; bool new_ok = ok; - GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + // Allow a "false" return value from FinalizeResult to silence the + // callback, just as it silences a CQ tag in the async cases + bool do_callback = ops_->FinalizeResult(&ignored, &new_ok); GPR_CODEGEN_ASSERT(ignored == ops_); - // Last use of func_, so ok to move it out for rvalue call above - auto func = std::move(func_); - func_ = nullptr; // reset to clear this out for sure - CatchingCallback(std::move(func), ok); + if (do_callback) { + // Last use of func_, so ok to move it out for rvalue call above + auto func = std::move(func_); + func_ = nullptr; // reset to clear this out for sure + CatchingCallback(std::move(func), ok); + } else { + func_ = nullptr; // reset to clear this out for sure + } g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index ecb9073cf9..82ee862f61 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -139,7 +140,7 @@ class ServerContext { /// must end in "-bin". void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - /// IsCancelled is always safe to call when using sync API. + /// IsCancelled is always safe to call when using sync or callback API. /// When using async API, it is only safe to call IsCancelled after /// the AsyncNotifyWhenDone tag has been delivered. bool IsCancelled() const; @@ -281,7 +282,7 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call); + void BeginCompletionOp(internal::Call* call, bool callback); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); @@ -312,6 +313,7 @@ class ServerContext { CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; + internal::CallbackWithSuccessTag completion_tag_; gpr_timespec deadline_; grpc_call* call_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 4ce2a46a09..a9f0315af6 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -279,7 +279,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_); + ctx_.BeginCompletionOp(&call_, false); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); @@ -444,7 +444,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - // req_->ctx_.BeginCompletionOp(call_); + req_->ctx_.BeginCompletionOp(call_, true); req_->method_->handler()->RunHandler( internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, @@ -994,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); } *tag = tag_; if (delete_on_finalize_) { @@ -1005,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 51a2689c6a..8ded7a8972 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), + cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), done_intercepting_(false) {} + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + ~CompletionOp() { if (call_.server_rpc_info()) { call_.server_rpc_info()->Unref(); @@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API - void* cq_tag() override { return this; } + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + + void* cq_tag() override { return cq_tag_; } void Unref(); @@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; + void* cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -158,7 +167,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, this, nullptr)); + grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -251,7 +260,7 @@ void ServerContext::Clear() { // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -260,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); - if (has_notify_when_done_tag_) { + if (callback) { + completion_tag_ = + internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_op_->set_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -289,12 +302,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (completion_tag_ || has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } -- cgit v1.2.3 From 932abf48a3fd0757a05bdbae4038c20fd521f312 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 31 Oct 2018 01:02:07 -0700 Subject: Address reviewer comments. --- include/grpcpp/impl/codegen/async_unary_call.h | 2 +- include/grpcpp/impl/codegen/call_op_set.h | 22 +++--- .../grpcpp/impl/codegen/call_op_set_interface.h | 4 +- include/grpcpp/impl/codegen/client_callback.h | 2 +- include/grpcpp/impl/codegen/rpc_service_method.h | 11 +-- include/grpcpp/impl/codegen/server_callback.h | 80 ++++++++++------------ src/compiler/cpp_generator.cc | 6 ++ src/cpp/client/channel_cc.cc | 4 +- src/cpp/common/completion_queue_cc.cc | 10 +-- src/cpp/server/server_cc.cc | 4 +- src/cpp/server/server_context.cc | 16 ++--- 11 files changed, 82 insertions(+), 79 deletions(-) (limited to 'src/cpp/server') diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 744b128141..89dcb12418 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final /// metadata. void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); - finish_buf_.set_cq_tag(&finish_buf_); + finish_buf_.set_core_cq_tag(&finish_buf_); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 785688e67f..5c52b027b2 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : cq_tag_(this), return_tag_(this) {} + CallOpSet() : core_cq_tag_(this), return_tag_(this) {} // The copy constructor and assignment operator reset the value of - // cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since - // those are only meaningful on a specific object, not across objects. + // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ + // since those are only meaningful on a specific object, not across objects. CallOpSet(const CallOpSet& other) - : cq_tag_(this), + : core_cq_tag_(this), return_tag_(this), call_(other.call_), done_intercepting_(false), interceptor_methods_(InterceptorBatchMethodsImpl()) {} CallOpSet& operator=(const CallOpSet& other) { - cq_tag_ = this; + core_cq_tag_ = this; return_tag_ = this; call_ = other.call_; done_intercepting_ = false; @@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } - /// set_cq_tag is used to provide a different core CQ tag than "this". + /// set_core_cq_tag is used to provide a different core CQ tag than "this". /// This is used for callback-based tags, where the core tag is the core /// callback function. It does not change the use or behavior of any other /// function (such as FinalizeResult) - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. @@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface, this->Op6::AddOp(ops, &nops); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), ops, nops, cq_tag(), nullptr)); + call_.call(), ops, nops, core_cq_tag(), nullptr)); } // Should be called after interceptors are done running on the finalize result @@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface, done_intercepting_ = true; GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, cq_tag(), nullptr)); + call_.call(), nullptr, 0, core_cq_tag(), nullptr)); } private: @@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface, return interceptor_methods_.RunInterceptors(); } - void* cq_tag_; + void* core_cq_tag_; void* return_tag_; Call call_; bool done_intercepting_ = false; diff --git a/include/grpcpp/impl/codegen/call_op_set_interface.h b/include/grpcpp/impl/codegen/call_op_set_interface.h index 815227a299..3b74566a6d 100644 --- a/include/grpcpp/impl/codegen/call_op_set_interface.h +++ b/include/grpcpp/impl/codegen/call_op_set_interface.h @@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag { virtual void FillOps(internal::Call* call) = 0; /// Get the tag to be used at the core completion queue. Generally, the - /// value of cq_tag will be "this". However, it can be overridden if we + /// value of core_cq_tag will be "this". However, it can be overridden if we /// want core to process the tag differently (e.g., as a core callback) - virtual void* cq_tag() = 0; + virtual void* core_cq_tag() = 0; // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index ecb00a0769..4baa819091 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag); + ops->set_core_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index 1890e4254d..f465c5fc2f 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -46,21 +46,22 @@ class MethodHandler { /// \param context : the ServerContext structure for this server call /// \param req : the request payload, if appropriate for this RPC /// \param req_status : the request status after any interceptors have run - /// \param renew : used only by the callback API. It is a function - /// called by the RPC Controller to request another RPC + /// \param rpc_requester : used only by the callback API. It is a function + /// called by the RPC Controller to request another RPC (and also + /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status, std::function renew) + Status req_status, std::function requester) : call(c), server_context(context), request(req), status(req_status), - renewer(std::move(renew)) {} + call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; - std::function renewer; + std::function call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index c8f7510ed5..5d56cbf1df 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -48,7 +48,6 @@ class ServerCallbackRpcController { // The method handler must call this function when it is done so that // the library knows to free its resources virtual void Finish(Status s) = 0; - virtual void FinishWithError(Status s) = 0; // Allow the method handler to push out the initial metadata before // the response and status are ready @@ -75,7 +74,8 @@ class CallbackUnaryHandler : public MethodHandler { param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) ServerCallbackRpcControllerImpl( param.server_context, param.call, - static_cast(param.request), std::move(param.renewer)); + static_cast(param.request), + std::move(param.call_requester)); Status status = param.status; if (status.ok()) { @@ -112,52 +112,19 @@ class CallbackUnaryHandler : public MethodHandler { // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows // it to take advantage of CallbackUnaryHandler's friendships. - class ServerCallbackRpcControllerImpl : public experimental::ServerCallbackRpcController { public: - void Finish(Status s) override { FinishInternal(std::move(s), false); } - - void FinishWithError(Status s) override { - FinishInternal(std::move(s), true); - } - - void SendInitialMetadata(std::function f) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_tag_ = - CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); - meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_buf_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_buf_.set_cq_tag(&meta_tag_); - call_.PerformOps(&meta_buf_); - } - - private: - template - friend class CallbackUnaryHandler; - - ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - RequestType* req, - std::function renewer) - : ctx_(ctx), call_(*call), req_(req), renewer_(std::move(renewer)) {} - - ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } - - void FinishInternal(Status s, bool allow_error) { + void Finish(Status s) override { finish_tag_ = CallbackWithSuccessTag( call_.call(), [this](bool) { grpc_call* call = call_.call(); - auto renewer = std::move(renewer_); + auto call_requester = std::move(call_requester_); this->~ServerCallbackRpcControllerImpl(); // explicitly call // destructor g_core_codegen_interface->grpc_call_unref(call); - renewer(); + call_requester(); }, &finish_buf_); if (!ctx_->sent_initial_metadata_) { @@ -168,17 +135,46 @@ class CallbackUnaryHandler : public MethodHandler { } ctx_->sent_initial_metadata_ = true; } - // The response may be dropped if the status is not OK. - if (allow_error || s.ok()) { + // The response is dropped if the status is not OK. + if (s.ok()) { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, finish_buf_.SendMessage(resp_)); } else { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); } - finish_buf_.set_cq_tag(&finish_tag_); + finish_buf_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_buf_); } + void SendInitialMetadata(std::function f) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_tag_ = + CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_buf_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_buf_); + } + + private: + template + friend class CallbackUnaryHandler; + + ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, + RequestType* req, + std::function call_requester) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)) {} + + ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } + RequestType* request() { return req_; } ResponseType* response() { return &resp_; } @@ -193,7 +189,7 @@ class CallbackUnaryHandler : public MethodHandler { Call call_; RequestType* req_; ResponseType resp_; - std::function renewer_; + std::function call_requester_; }; }; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index fcff75a420..34d5beb653 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -926,8 +926,11 @@ void PrintHeaderServerMethodCallback( "Method$(context, request, response, controller);\n" " }, this));\n"); } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods } printer->Print(*vars, "}\n"); printer->Print(*vars, @@ -975,8 +978,11 @@ void PrintHeaderServerMethodRawCallback( "Method$(context, request, response, controller);\n" " }, this));\n"); } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods } printer->Print(*vars, "}\n"); printer->Print(*vars, diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 15e3ccb3c9..f5c1b4e2c5 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -225,7 +225,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -238,7 +238,7 @@ CompletionQueue* Channel::CallbackCQ() { // if there is no explicit per-channel CQ registered std::lock_guard l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 6893201e2e..d93a54aed7 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,10 +60,10 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast(ev.tag); + auto core_cq_tag = static_cast(ev.tag); *ok = ev.success != 0; - *tag = cq_tag; - if (cq_tag->FinalizeResult(tag, ok)) { + *tag = core_cq_tag; + if (core_cq_tag->FinalizeResult(tag, ok)) { return GOT_EVENT; } break; @@ -87,9 +87,9 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast(res_tag); + auto core_cq_tag = static_cast(res_tag); *ok = res == 1; - if (cq_tag->FinalizeResult(tag, ok)) { + if (core_cq_tag->FinalizeResult(tag, ok)) { return true; } } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index a9f0315af6..870ee84e3e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1111,7 +1111,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -1124,7 +1124,7 @@ CompletionQueue* Server::CallbackCQ() { // if there is no explicit per-server CQ registered std::lock_guard l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 8ded7a8972..355debb3fb 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,7 +45,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), - cq_tag_(this), + core_cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), @@ -92,9 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } void Unref(); @@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; - void* cq_tag_; + void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -166,8 +166,8 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1, + core_cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -272,7 +272,7 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { if (callback) { completion_tag_ = internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); - completion_op_->set_cq_tag(&completion_tag_); + completion_op_->set_core_cq_tag(&completion_tag_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } @@ -305,7 +305,7 @@ bool ServerContext::IsCancelled() const { if (completion_tag_) { // When using callback API, this result is always valid. return completion_op_->CheckCancelledAsync(); - } else if (completion_tag_ || has_notify_when_done_tag_) { + } else if (has_notify_when_done_tag_) { // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); -- cgit v1.2.3