diff options
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/create_default_thread_pool.cc | 6 | ||||
-rw-r--r-- | src/cpp/server/dynamic_thread_pool.cc | 24 | ||||
-rw-r--r-- | src/cpp/server/secure_server_credentials.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 105 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 21 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 8 |
6 files changed, 126 insertions, 42 deletions
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 81c84474d8..9f59d254f1 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -39,9 +39,9 @@ namespace grpc { ThreadPoolInterface* CreateDefaultThreadPool() { - int cores = gpr_cpu_num_cores(); - if (!cores) cores = 4; - return new DynamicThreadPool(cores); + int cores = gpr_cpu_num_cores(); + if (!cores) cores = 4; + return new DynamicThreadPool(cores); } } // namespace grpc diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index f58d0420df..b475f43b1d 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -36,10 +36,10 @@ #include <grpc++/dynamic_thread_pool.h> namespace grpc { -DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool): - pool_(pool), - thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)) { -} +DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) + : pool_(pool), + thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, + this)) {} DynamicThreadPool::DynamicThread::~DynamicThread() { thd_->join(); thd_.reset(); @@ -57,7 +57,7 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->shutdown_cv_.notify_one(); } } - + void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. @@ -65,7 +65,7 @@ void DynamicThreadPool::ThreadFunc() { if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { - break; + break; } threads_waiting_++; cv_.wait(lock); @@ -84,9 +84,11 @@ void DynamicThreadPool::ThreadFunc() { } } -DynamicThreadPool::DynamicThreadPool(int reserve_threads) : - shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), - threads_waiting_(0) { +DynamicThreadPool::DynamicThreadPool(int reserve_threads) + : shutdown_(false), + reserve_threads_(reserve_threads), + nthreads_(0), + threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { grpc::lock_guard<grpc::mutex> lock(mu_); nthreads_++; @@ -96,10 +98,10 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) : void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) { - delete *t; + delete *t; } } - + DynamicThreadPool::~DynamicThreadPool() { grpc::unique_lock<grpc::mutex> lock(mu_); shutdown_ = true; diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index 32c45e2280..f203cf7f49 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -35,8 +35,8 @@ namespace grpc { -int SecureServerCredentials::AddPortToServer( - const grpc::string& addr, grpc_server* server) { +int SecureServerCredentials::AddPortToServer(const grpc::string& addr, + grpc_server* server) { return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_); } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 6e576ab8b3..d8ea375636 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), + call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -84,7 +90,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void SetupRequest() { cq_ = grpc_completion_queue_create(); } + static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, + gpr_timespec deadline) { + void* tag = nullptr; + *ok = false; + switch (cq->AsyncNext(&tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + *req = nullptr; + return true; + case CompletionQueue::SHUTDOWN: + *req = nullptr; + return false; + case CompletionQueue::GOT_EVENT: + *req = static_cast<SyncRequest*>(tag); + GPR_ASSERT((*req)->in_flight_); + return true; + } + gpr_log(GPR_ERROR, "Should never reach here"); + abort(); + } + + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -94,17 +120,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { if (!*status) { grpc_completion_queue_destroy(cq_); } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } return true; } @@ -157,6 +198,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { bool in_flight_; const bool has_request_payload_; grpc_call* call_; + grpc_call_details* call_details_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc_byte_buffer* request_payload_; @@ -176,9 +218,9 @@ static grpc_server* CreateServer( args[1].value.integer = compression_options.enabled_algorithms_bitset; grpc_channel_args channel_args = {2, args}; - return grpc_server_create(&channel_args); + return grpc_server_create(&channel_args, NULL); } else { - return grpc_server_create(nullptr); + return grpc_server_create(nullptr, nullptr); } } @@ -190,10 +232,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), + has_generic_service_(false), server_(CreateServer(max_message_size, compression_options)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { - grpc_server_register_completion_queue(server_, cq_.cq()); + grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } Server::~Server() { @@ -214,23 +257,23 @@ Server::~Server() { delete sync_methods_; } -bool Server::RegisterService(const grpc::string *host, RpcService* service) { +bool Server::RegisterService(const grpc::string* host, RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = grpc_server_register_method( - server_, method->name(), host ? host->c_str() : nullptr); + void* tag = grpc_server_register_method(server_, method->name(), + host ? host->c_str() : nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - SyncRequest request(method, tag); - sync_methods_->emplace_back(request); + sync_methods_->emplace_back(method, tag); } return true; } -bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string* host, + AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; @@ -252,6 +295,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; + has_generic_service_ = true; } int Server::AddListeningPort(const grpc::string& addr, @@ -265,6 +309,14 @@ bool Server::Start() { started_ = true; grpc_server_start(server_); + if (!has_generic_service_) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted here + // by gcc-4.4 because it can't match the anonymous nullptr with a proper + // constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { @@ -278,12 +330,27 @@ bool Server::Start() { return true; } -void Server::Shutdown() { +void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); + // Spin, eating requests until the completion queue is completely shutdown. + // If the deadline expires then cancel anything that's pending and keep + // spinning forever until the work is actually drained. + // Since nothing else needs to touch state guarded by mu_, holding it + // through this loop is fine. + SyncRequest* request; + bool ok; + while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { + if (request == NULL) { // deadline expired + grpc_server_cancel_all_calls(server_); + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } else if (ok) { + SyncRequest::CallData call_data(this, request); + } + } // Wait for running callbacks to finish. while (num_running_cb_ != 0) { @@ -304,8 +371,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops)); + auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); + GPR_ASSERT(GRPC_CALL_OK == result); } Server::BaseAsyncRequest::BaseAsyncRequest( diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 425b052128..57a734c334 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -38,6 +38,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/server.h> #include <grpc++/thread_pool_interface.h> +#include <grpc++/fixed_size_thread_pool.h> namespace grpc { @@ -63,9 +64,10 @@ ServerBuilder& ServerBuilder::RegisterAsyncService( return *this; } -ServerBuilder& ServerBuilder::RegisterService( - const grpc::string& addr, SynchronousService* service) { - services_.emplace_back(new NamedService<RpcService>(addr, service->service())); +ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr, + SynchronousService* service) { + services_.emplace_back( + new NamedService<RpcService>(addr, service->service())); return *this; } @@ -123,11 +125,18 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } + // Async services only, create a thread pool to handle requests to unknown + // services. + if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { + thread_pool_ = new FixedSizeThreadPool(1); + thread_pool_owned = true; + } std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, max_message_size_, compression_options_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); } for (auto service = services_.begin(); service != services_.end(); service++) { @@ -135,8 +144,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } - for (auto service = async_services_.begin(); - service != async_services_.end(); service++) { + for (auto service = async_services_.begin(); service != async_services_.end(); + service++) { if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) { return nullptr; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 04373397f9..03461ddda5 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -50,7 +50,12 @@ namespace grpc { class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {} + CompletionOp() + : has_tag_(false), + tag_(nullptr), + refs_(2), + finalized_(false), + cancelled_(0) {} void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -91,6 +96,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; ops->flags = 0; + ops->reserved = NULL; *nops = 1; } |