diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/rpcmanager/grpc_rpc_manager.cc | 188 | ||||
-rw-r--r-- | src/cpp/rpcmanager/grpc_rpc_manager.h | 157 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 130 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 333 |
4 files changed, 725 insertions, 83 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc new file mode 100644 index 0000000000..2299dbdcd3 --- /dev/null +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -0,0 +1,188 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc/support/log.h> +#include <climits> + +#include "src/cpp/rpcmanager/grpc_rpc_manager.h" + +namespace grpc { + +GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread( + GrpcRpcManager* rpc_mgr) + : rpc_mgr_(rpc_mgr), + thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {} + +void GrpcRpcManager::GrpcRpcManagerThread::Run() { + rpc_mgr_->MainWorkLoop(); + rpc_mgr_->MarkAsCompleted(this); +} + +GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() { + thd_->join(); + thd_.reset(); +} + +GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) + : shutdown_(false), + num_pollers_(0), + min_pollers_(min_pollers), + max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), + num_threads_(0) {} + +GrpcRpcManager::~GrpcRpcManager() { + { + std::unique_lock<grpc::mutex> lock(mu_); + GPR_ASSERT(num_threads_ == 0); + } + + CleanupCompletedThreads(); +} + +void GrpcRpcManager::Wait() { + std::unique_lock<grpc::mutex> lock(mu_); + while (num_threads_ != 0) { + shutdown_cv_.wait(lock); + } +} + +void GrpcRpcManager::ShutdownRpcManager() { + std::unique_lock<grpc::mutex> lock(mu_); + shutdown_ = true; +} + +bool GrpcRpcManager::IsShutdown() { + std::unique_lock<grpc::mutex> lock(mu_); + return shutdown_; +} + +void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) { + { + std::unique_lock<grpc::mutex> list_lock(list_mu_); + completed_threads_.push_back(thd); + } + + grpc::unique_lock<grpc::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } +} + +void GrpcRpcManager::CleanupCompletedThreads() { + std::unique_lock<grpc::mutex> lock(list_mu_); + for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); + thd = completed_threads_.erase(thd)) { + delete *thd; + } +} + +void GrpcRpcManager::Initialize() { + for (int i = 0; i < min_pollers_; i++) { + MaybeCreatePoller(); + } +} + +// If the number of pollers (i.e threads currently blocked in PollForWork()) is +// less than max threshold (i.e max_pollers_) and the total number of threads is +// below the maximum threshold, we can let the current thread continue as poller +bool GrpcRpcManager::MaybeContinueAsPoller() { + std::unique_lock<grpc::mutex> lock(mu_); + + if (shutdown_ || num_pollers_ > max_pollers_) { + return false; + } + + num_pollers_++; + return true; +} + +// Create a new poller if the current number of pollers i.e num_pollers_ (i.e +// threads currently blocked in PollForWork()) is below the threshold (i.e +// min_pollers_) and the total number of threads is below the maximum threshold +void GrpcRpcManager::MaybeCreatePoller() { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && num_pollers_ < min_pollers_) { + num_pollers_++; + num_threads_++; + + // Create a new thread (which ends up calling the MainWorkLoop() function + new GrpcRpcManagerThread(this); + } +} + +void GrpcRpcManager::MainWorkLoop() { + void* tag; + bool ok; + + /* + 1. Poll for work (i.e PollForWork()) + 2. After returning from PollForWork, reduce the number of pollers by 1. If + PollForWork() returned a TIMEOUT, then it may indicate that we have more + polling threads than needed. Check if the number of pollers is greater + than min_pollers and if so, terminate the thread. + 3. Since we are short of one poller now, see if a new poller has to be + created (i.e see MaybeCreatePoller() for more details) + 4. Do the actual work (DoWork()) + 5. After doing the work, see it this thread can resume polling work (i.e + see MaybeContinueAsPoller() for more details) */ + do { + WorkStatus work_status = PollForWork(&tag, &ok); + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_pollers_--; + + if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { + break; + } + } + + // TODO (sreek) See if we need to check for shutdown here and quit + // Note that MaybeCreatePoller does check for shutdown and creates a new + // thread only if GrpcRpcManager is not shutdown + if (work_status == WORK_FOUND) { + MaybeCreatePoller(); + DoWork(tag, ok); + } + } while (MaybeContinueAsPoller()); + + CleanupCompletedThreads(); + + // If we are here, either GrpcRpcManager is shutting down or it already has + // enough threads. +} + +} // namespace grpc diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h new file mode 100644 index 0000000000..d00771b9a1 --- /dev/null +++ b/src/cpp/rpcmanager/grpc_rpc_manager.h @@ -0,0 +1,157 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H +#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H + +#include <list> +#include <memory> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> + +namespace grpc { + +class GrpcRpcManager { + public: + explicit GrpcRpcManager(int min_pollers, int max_pollers); + virtual ~GrpcRpcManager(); + + // This function MUST be called before using the object + void Initialize(); + + enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT }; + + // "Polls" for new work. + // If the return value is WORK_FOUND: + // - The implementaion of PollForWork() MAY set some opaque identifier to + // (identify the work item found) via the '*tag' parameter + // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A + // value of 'false' indicates some implemenation specific error (that is + // neither SHUTDOWN nor TIMEOUT) + // - GrpcRpcManager does not interpret the values of 'tag' and 'ok' + // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to + // DoWork() + // + // If the return value is SHUTDOWN:, + // - GrpcManager WILL NOT call DoWork() and terminates the thead + // + // If the return value is TIMEOUT:, + // - GrpcManager WILL NOT call DoWork() + // - GrpcManager MAY terminate the thread depending on the current number of + // active poller threads and mix_pollers/max_pollers settings + // - Also, the value of timeout is specific to the derived class + // implementation + virtual WorkStatus PollForWork(void** tag, bool* ok) = 0; + + // The implementation of DoWork() is supposed to perform the work found by + // PollForWork(). The tag and ok parameters are the same as returned by + // PollForWork() + // + // The implementation of DoWork() should also do any setup needed to ensure + // that the next call to PollForWork() (not necessarily by the current thread) + // actually finds some work + virtual void DoWork(void* tag, bool ok) = 0; + + // Mark the GrpcRpcManager as shutdown and begin draining the work. + // This is a non-blocking call and the caller should call Wait(), a blocking + // call which returns only once the shutdown is complete + void ShutdownRpcManager(); + + // Has ShutdownRpcManager() been called + bool IsShutdown(); + + // A blocking call that returns only after the GrpcRpcManager has shutdown and + // all the threads have drained all the outstanding work + void Wait(); + + private: + // Helper wrapper class around std::thread. This takes a GrpcRpcManager object + // and starts a new std::thread to calls the Run() function. + // + // The Run() function calls GrpcManager::MainWorkLoop() function and once that + // completes, it marks the GrpcRpcManagerThread completed by calling + // GrpcRpcManager::MarkAsCompleted() + class GrpcRpcManagerThread { + public: + GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr); + ~GrpcRpcManagerThread(); + + private: + // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls + // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed + void Run(); + + GrpcRpcManager* rpc_mgr_; + std::unique_ptr<grpc::thread> thd_; + }; + + // The main funtion in GrpcRpcManager + void MainWorkLoop(); + + // Create a new poller if the number of current pollers is less than the + // minimum number of pollers needed (i.e min_pollers). + void MaybeCreatePoller(); + + // Returns true if the current thread can resume as a poller. i.e if the + // current number of pollers is less than the max_pollers. + bool MaybeContinueAsPoller(); + + void MarkAsCompleted(GrpcRpcManagerThread* thd); + void CleanupCompletedThreads(); + + // Protects shutdown_, num_pollers_ and num_threads_ + // TODO: sreek - Change num_pollers and num_threads_ to atomics + grpc::mutex mu_; + + bool shutdown_; + grpc::condition_variable shutdown_cv_; + + // Number of threads doing polling + int num_pollers_; + + // The minimum and maximum number of threads that should be doing polling + int min_pollers_; + int max_pollers_; + + // The total number of threads (includes threads includes the threads that are + // currently polling i.e num_pollers_) + int num_threads_; + + grpc::mutex list_mu_; + std::list<GrpcRpcManagerThread*> completed_threads_; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 2980b16c56..59c40dedaf 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -35,6 +35,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/server.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -61,6 +62,7 @@ ServerBuilder::ServerBuilder() auto& factory = *it; plugins_.emplace_back(factory()); } + // all compression algorithms enabled by default. enabled_compression_algorithms_bitset_ = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; @@ -68,6 +70,16 @@ ServerBuilder::ServerBuilder() sizeof(maybe_default_compression_level_)); memset(&maybe_default_compression_algorithm_, 0, sizeof(maybe_default_compression_algorithm_)); + + // Sync server setting defaults + sync_server_settings_.min_pollers = 1; + sync_server_settings_.max_pollers = INT_MAX; + + int num_cpus = gpr_cpu_num_cores(); + num_cpus = GPR_MAX(num_cpus, 4); + sync_server_settings_.num_cqs = num_cpus; + + sync_server_settings_.cq_timeout_msec = 1000; } std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( @@ -94,7 +106,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - service); + (void*)service); } else { generic_service_ = service; } @@ -130,6 +142,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm( return *this; } +void ServerBuilder::SetSyncServerSettings(SyncServerSettings settings) { + sync_server_settings_ = settings; // copy the settings +} + ServerBuilder& ServerBuilder::AddListeningPort( const grpc::string& addr, std::shared_ptr<ServerCredentials> creds, int* selected_port) { @@ -139,35 +155,24 @@ ServerBuilder& ServerBuilder::AddListeningPort( } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - std::unique_ptr<ThreadPoolInterface> thread_pool; - bool has_sync_methods = false; - for (auto it = services_.begin(); it != services_.end(); ++it) { - if ((*it)->service->has_synchronous_methods()) { - if (!thread_pool) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - break; - } - } - } ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); (*option)->UpdatePlugins(&plugins_); } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { - if (!thread_pool && (*plugin)->has_sync_methods()) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - } (*plugin)->UpdateChannelArguments(&args); } + if (max_receive_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } + if (max_send_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); } + args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, enabled_compression_algorithms_bitset_); if (maybe_default_compression_level_.is_set) { @@ -178,27 +183,84 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, maybe_default_compression_algorithm_.algorithm); } - std::unique_ptr<Server> server(new Server(thread_pool.release(), true, - max_receive_message_size_, &args)); + + // == Determine if the server has any syncrhonous methods == + bool has_sync_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; + break; + } + } + + if (!has_sync_methods) { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + if ((*plugin)->has_sync_methods()) { + has_sync_methods = true; + break; + } + } + } + + // If this is a Sync server, i.e a server expositing sync API, then the server + // needs to create some completion queues to listen for incoming requests. + // 'sync_server_cqs' are those internal completion queues. + // + // This is different from the completion queues added to the server via + // ServerBuilder's AddCompletionQueue() method (those completion queues + // are in 'cqs_' member variable of ServerBuilder object) + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs( + new std::vector<std::unique_ptr<ServerCompletionQueue>>()); + + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + + // Create completion queues to listen to incoming rpc requests + for (int i = 0; i < sync_server_settings_.num_cqs; i++) { + sync_server_cqs->emplace_back(new ServerCompletionQueue()); + } + } + + std::unique_ptr<Server> server(new Server( + sync_server_cqs, max_receive_message_size_, &args, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec)); + ServerInitializer* initializer = server->initializer(); - // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. - int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; - - for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - // A completion queue that is not polled frequently (by calling Next() or - // AsyncNext()) is not safe to use for listening to incoming channels. - // Register all such completion queues as non-listening completion queues - // with the GRPC core library. - if ((*cq)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + // Register all the completion queues with the server. i.e + // 1. sync_server_cqs: internal completion queues created IF this is a sync + // server + // 2. cqs_: Completion queues added via AddCompletionQueue() call + + // All sync cqs (if any) are frequently polled by the GrpcRpcManager + int num_frequently_polled_cqs = sync_server_cqs->size(); + + for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); + } + + // cqs_ contains the completion queue added by calling the ServerBuilder's + // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by + // calling Next() or AsyncNext()) and hence are not safe to be used for + // listening to incoming channels. Such completion queues must be registered + // as non-listening queues + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, - (*cq)->cq(), nullptr); + (*it)->cq(), nullptr); } } @@ -214,9 +276,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->InitServer(initializer); } + if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } else { @@ -229,6 +293,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } } } + for (auto port = ports_.begin(); port != ports_.end(); port++) { int r = server->AddListeningPort(port->addr, port->creds.get()); if (!r) return nullptr; @@ -236,13 +301,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } + auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; if (!server->Start(cqs_data, cqs_.size())) { return nullptr; } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->Finish(initializer); } + return server; } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3f89275370..36bc61fdf1 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; +// TODO (sreek) - This might no longer be needed class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -126,6 +127,11 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { } }; +class ShutdownTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { return false; } +}; + class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -147,6 +153,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } + // TODO (Sreek) This function is probably no longer needed static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; *ok = false; @@ -158,6 +165,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + // TODO (sreek) - This function is probably no longer needed static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, gpr_timespec deadline) { void* tag = nullptr; @@ -177,6 +185,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { GPR_UNREACHABLE_CODE(return false); } + // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest + // functions void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -184,6 +194,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } + void ResetRequest() { in_flight_ = false; } + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; @@ -275,53 +287,168 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::SyncRequestManager : public GrpcRpcManager { + public: + SyncRequestManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers, int cq_timeout_msec) + : GrpcRpcManager(min_pollers, max_pollers), + server_(server), + server_cq_(server_cq), + cq_timeout_msec_(cq_timeout_msec), + global_callbacks_(global_callbacks) {} + + WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { + *tag = nullptr; + gpr_timespec deadline = + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); + + switch (server_cq_->AsyncNext(tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + return TIMEOUT; + case CompletionQueue::SHUTDOWN: + return SHUTDOWN; + case CompletionQueue::GOT_EVENT: + return WORK_FOUND; + } + + GPR_UNREACHABLE_CODE(return TIMEOUT); + } + + void DoWork(void* tag, bool ok) GRPC_OVERRIDE { + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + + if (!sync_req) { + // No tag. Nothing to work on + // TODO (sreek) - Log a warning here since this is an unlikely case + return; + } + + if (ok) { + SyncRequest::CallData cd(server_, sync_req); + { + sync_req->SetupRequest(); + if (!IsShutdown()) { + sync_req->Request(server_->c_server(), server_cq_->cq()); + } else { + sync_req->TeardownRequest(); + } + } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } else { + sync_req->ResetRequest(); + // ok is false. For some reason, the tag was returned but event was not + // successful. In this case, request again unless we are shutting down + if (!IsShutdown()) { + // TODO (sreek) Remove this + // sync_req->Request(server_->c_server(), server_cq_->cq()); + } + } + } + + void AddSyncMethod(RpcServiceMethod* method, void* tag) { + sync_methods_.emplace_back(method, tag); + } + + void AddUnknownSyncMethod() { + // TODO (sreek) - Check if !sync_methods_.empty() is really needed here + if (!sync_methods_.empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + } + + void ShutdownAndDrainCompletionQueue() { + server_cq_->Shutdown(); + + // Drain any pending items from the queue + void* tag; + bool ok; + while (server_cq_->Next(&tag, &ok)) { + // Nothing to be done here + } + } + + void Start() { + if (!sync_methods_.empty()) { + for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { + m->SetupRequest(); + m->Request(server_->c_server(), server_cq_->cq()); + } + + GrpcRpcManager::Initialize(); + } + } + + private: + Server* server_; + CompletionQueue* server_cq_; + int cq_timeout_msec_; + std::vector<SyncRequest> sync_methods_; + std::unique_ptr<RpcServiceMethod> unknown_method_; + std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; +}; + static internal::GrpcLibraryInitializer g_gli_initializer; -Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args) +Server::Server( + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int max_receive_message_size, ChannelArguments* args, int min_pollers, + int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), + sync_server_cqs_(sync_server_cqs), started_(false), shutdown_(false), shutdown_notified_(false), - num_running_cb_(0), - sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), server_(nullptr), - thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned), server_initializer_(new ServerInitializer(this)) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; global_callbacks_->UpdateArguments(args); + + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + sync_req_mgrs_.emplace_back( + new SyncRequestManager(this, (*it).get(), global_callbacks_, + min_pollers, max_pollers, sync_cq_timeout_msec)); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); + server_ = grpc_server_create(&channel_args, nullptr); - if (thread_pool_ == nullptr) { - grpc_server_register_non_listening_completion_queue(server_, cq_.cq(), - nullptr); - } else { - grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); - } } Server::~Server() { { + // TODO (sreek) Check if we can just call Shutdown() even in case where + // started_ == false. This will make things much simpler grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); Shutdown(); } else if (!started_) { - cq_.Shutdown(); + // Shutdown the completion queues + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownAndDrainCompletionQueue(); + } } } + + // TODO(sreek) Do thisfor all cqs ? + /* void* got_tag; bool ok; GPR_ASSERT(!cq_.Next(&got_tag, &ok)); + */ grpc_server_destroy(server_); - if (thread_pool_owned_) { - delete thread_pool_; - } - delete sync_methods_; } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -352,6 +479,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { "Can only register an asynchronous service against one server."); service->server_ = this; } + const char* method_name = nullptr; for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { @@ -370,7 +498,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { if (method->handler() == nullptr) { method->set_server_tag(tag); } else { - sync_methods_->emplace_back(method, tag); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddSyncMethod(method, tag); + } } method_name = method->name(); } @@ -406,20 +536,23 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); if (!has_generic_service_) { - if (!sync_methods_->empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted - // here by gcc-4.4 because it can't match the anonymous nullptr with a - // proper constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddUnknownSyncMethod(); } + for (size_t i = 0; i < num_cqs; i++) { if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); } } } + + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Start(); + } + + /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */ + /* // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { @@ -427,24 +560,76 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { m->Request(server_, cq_.cq()); } - ScheduleCallback(); + GrpcRpcManager::Initialize(); } + */ return true; } +/* TODO (sreek) check if started_ and shutdown_ are needed anymore */ void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; + + /// The completion queue to use for server shutdown completion notification + CompletionQueue shutdown_cq; + ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + + // Shutdown all RpcManagers. This will try to gracefully stop all the + // threads in the RpcManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownRpcManager(); + } + + shutdown_cq.Shutdown(); + + void* tag; + bool ok; + CompletionQueue::NextStatus status = + shutdown_cq.AsyncNext(&tag, &ok, deadline); + + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls + if (status == CompletionQueue::NextStatus::TIMEOUT) { + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown + + // Wait for threads in all RpcManagers to terminate + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Wait(); + (*it)->ShutdownAndDrainCompletionQueue(); + } + + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here + } + + /* grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); lock.unlock(); + */ + + // TODO (sreek) Delete this + /* + GrpcRpcManager::ShutdownRpcManager(); + GrpcRpcManager::Wait(); + */ + // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. // Since nothing else needs to touch state guarded by mu_, holding it // through this loop is fine. + // + /* SyncRequest* request; bool ok; while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { @@ -456,11 +641,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } } lock.lock(); + */ + /* TODO (sreek) - Remove this block */ // Wait for running callbacks to finish. - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); - } + /* + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } + */ shutdown_notified_ = true; shutdown_cv_.notify_all(); @@ -585,46 +774,86 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } +// TODO: sreek - Remove this function void Server::ScheduleCallback() { + GPR_ASSERT(false); + /* { grpc::unique_lock<grpc::mutex> lock(mu_); num_running_cb_++; } thread_pool_->Add(std::bind(&Server::RunRpc, this)); + */ } +// TODO: sreek - Remove this function void Server::RunRpc() { - // Wait for one more incoming rpc. - bool ok; - GPR_TIMER_SCOPE("Server::RunRpc", 0); - auto* mrd = SyncRequest::Wait(&cq_, &ok); - if (mrd) { - ScheduleCallback(); - if (ok) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock<grpc::mutex> lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); + GPR_ASSERT(false); + /* + // Wait for one more incoming rpc. + bool ok; + GPR_TIMER_SCOPE("Server::RunRpc", 0); + auto* mrd = SyncRequest::Wait(&cq_, &ok); + if (mrd) { + ScheduleCallback(); + if (ok) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + } + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); } + */ +} + +/* TODO (sreek) Move this to SyncRequestManager */ +/* +void Server::PollForWork(bool& is_work_found, void** tag) { + is_work_found = true; + *tag = nullptr; + auto* mrd = SyncRequest::Wait(&cq_, &is_work_found); + if (is_work_found) { + *tag = mrd; } +} - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_--; - if (shutdown_) { - callback_cv_.notify_all(); + +void Server::DoWork(void* tag) { + auto* mrd = static_cast<SyncRequest*>(tag); + if (mrd) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); } } +*/ ServerInitializer* Server::initializer() { return server_initializer_.get(); } |