aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.cc188
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.h157
-rw-r--r--src/cpp/server/server_builder.cc130
-rw-r--r--src/cpp/server/server_cc.cc333
4 files changed, 725 insertions, 83 deletions
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
new file mode 100644
index 0000000000..2299dbdcd3
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -0,0 +1,188 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc/support/log.h>
+#include <climits>
+
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
+namespace grpc {
+
+GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread(
+ GrpcRpcManager* rpc_mgr)
+ : rpc_mgr_(rpc_mgr),
+ thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {}
+
+void GrpcRpcManager::GrpcRpcManagerThread::Run() {
+ rpc_mgr_->MainWorkLoop();
+ rpc_mgr_->MarkAsCompleted(this);
+}
+
+GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
+ thd_->join();
+ thd_.reset();
+}
+
+GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
+ : shutdown_(false),
+ num_pollers_(0),
+ min_pollers_(min_pollers),
+ max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
+ num_threads_(0) {}
+
+GrpcRpcManager::~GrpcRpcManager() {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ GPR_ASSERT(num_threads_ == 0);
+ }
+
+ CleanupCompletedThreads();
+}
+
+void GrpcRpcManager::Wait() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ while (num_threads_ != 0) {
+ shutdown_cv_.wait(lock);
+ }
+}
+
+void GrpcRpcManager::ShutdownRpcManager() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ shutdown_ = true;
+}
+
+bool GrpcRpcManager::IsShutdown() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ return shutdown_;
+}
+
+void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
+ {
+ std::unique_lock<grpc::mutex> list_lock(list_mu_);
+ completed_threads_.push_back(thd);
+ }
+
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_threads_--;
+ if (num_threads_ == 0) {
+ shutdown_cv_.notify_one();
+ }
+}
+
+void GrpcRpcManager::CleanupCompletedThreads() {
+ std::unique_lock<grpc::mutex> lock(list_mu_);
+ for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
+ thd = completed_threads_.erase(thd)) {
+ delete *thd;
+ }
+}
+
+void GrpcRpcManager::Initialize() {
+ for (int i = 0; i < min_pollers_; i++) {
+ MaybeCreatePoller();
+ }
+}
+
+// If the number of pollers (i.e threads currently blocked in PollForWork()) is
+// less than max threshold (i.e max_pollers_) and the total number of threads is
+// below the maximum threshold, we can let the current thread continue as poller
+bool GrpcRpcManager::MaybeContinueAsPoller() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+
+ if (shutdown_ || num_pollers_ > max_pollers_) {
+ return false;
+ }
+
+ num_pollers_++;
+ return true;
+}
+
+// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
+// threads currently blocked in PollForWork()) is below the threshold (i.e
+// min_pollers_) and the total number of threads is below the maximum threshold
+void GrpcRpcManager::MaybeCreatePoller() {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+
+ // Create a new thread (which ends up calling the MainWorkLoop() function
+ new GrpcRpcManagerThread(this);
+ }
+}
+
+void GrpcRpcManager::MainWorkLoop() {
+ void* tag;
+ bool ok;
+
+ /*
+ 1. Poll for work (i.e PollForWork())
+ 2. After returning from PollForWork, reduce the number of pollers by 1. If
+ PollForWork() returned a TIMEOUT, then it may indicate that we have more
+ polling threads than needed. Check if the number of pollers is greater
+ than min_pollers and if so, terminate the thread.
+ 3. Since we are short of one poller now, see if a new poller has to be
+ created (i.e see MaybeCreatePoller() for more details)
+ 4. Do the actual work (DoWork())
+ 5. After doing the work, see it this thread can resume polling work (i.e
+ see MaybeContinueAsPoller() for more details) */
+ do {
+ WorkStatus work_status = PollForWork(&tag, &ok);
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_pollers_--;
+
+ if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+ break;
+ }
+ }
+
+ // TODO (sreek) See if we need to check for shutdown here and quit
+ // Note that MaybeCreatePoller does check for shutdown and creates a new
+ // thread only if GrpcRpcManager is not shutdown
+ if (work_status == WORK_FOUND) {
+ MaybeCreatePoller();
+ DoWork(tag, ok);
+ }
+ } while (MaybeContinueAsPoller());
+
+ CleanupCompletedThreads();
+
+ // If we are here, either GrpcRpcManager is shutting down or it already has
+ // enough threads.
+}
+
+} // namespace grpc
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
new file mode 100644
index 0000000000..d00771b9a1
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -0,0 +1,157 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+
+#include <list>
+#include <memory>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+
+namespace grpc {
+
+class GrpcRpcManager {
+ public:
+ explicit GrpcRpcManager(int min_pollers, int max_pollers);
+ virtual ~GrpcRpcManager();
+
+ // This function MUST be called before using the object
+ void Initialize();
+
+ enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
+
+ // "Polls" for new work.
+ // If the return value is WORK_FOUND:
+ // - The implementaion of PollForWork() MAY set some opaque identifier to
+ // (identify the work item found) via the '*tag' parameter
+ // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
+ // value of 'false' indicates some implemenation specific error (that is
+ // neither SHUTDOWN nor TIMEOUT)
+ // - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
+ // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+ // DoWork()
+ //
+ // If the return value is SHUTDOWN:,
+ // - GrpcManager WILL NOT call DoWork() and terminates the thead
+ //
+ // If the return value is TIMEOUT:,
+ // - GrpcManager WILL NOT call DoWork()
+ // - GrpcManager MAY terminate the thread depending on the current number of
+ // active poller threads and mix_pollers/max_pollers settings
+ // - Also, the value of timeout is specific to the derived class
+ // implementation
+ virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
+
+ // The implementation of DoWork() is supposed to perform the work found by
+ // PollForWork(). The tag and ok parameters are the same as returned by
+ // PollForWork()
+ //
+ // The implementation of DoWork() should also do any setup needed to ensure
+ // that the next call to PollForWork() (not necessarily by the current thread)
+ // actually finds some work
+ virtual void DoWork(void* tag, bool ok) = 0;
+
+ // Mark the GrpcRpcManager as shutdown and begin draining the work.
+ // This is a non-blocking call and the caller should call Wait(), a blocking
+ // call which returns only once the shutdown is complete
+ void ShutdownRpcManager();
+
+ // Has ShutdownRpcManager() been called
+ bool IsShutdown();
+
+ // A blocking call that returns only after the GrpcRpcManager has shutdown and
+ // all the threads have drained all the outstanding work
+ void Wait();
+
+ private:
+ // Helper wrapper class around std::thread. This takes a GrpcRpcManager object
+ // and starts a new std::thread to calls the Run() function.
+ //
+ // The Run() function calls GrpcManager::MainWorkLoop() function and once that
+ // completes, it marks the GrpcRpcManagerThread completed by calling
+ // GrpcRpcManager::MarkAsCompleted()
+ class GrpcRpcManagerThread {
+ public:
+ GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
+ ~GrpcRpcManagerThread();
+
+ private:
+ // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls
+ // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed
+ void Run();
+
+ GrpcRpcManager* rpc_mgr_;
+ std::unique_ptr<grpc::thread> thd_;
+ };
+
+ // The main funtion in GrpcRpcManager
+ void MainWorkLoop();
+
+ // Create a new poller if the number of current pollers is less than the
+ // minimum number of pollers needed (i.e min_pollers).
+ void MaybeCreatePoller();
+
+ // Returns true if the current thread can resume as a poller. i.e if the
+ // current number of pollers is less than the max_pollers.
+ bool MaybeContinueAsPoller();
+
+ void MarkAsCompleted(GrpcRpcManagerThread* thd);
+ void CleanupCompletedThreads();
+
+ // Protects shutdown_, num_pollers_ and num_threads_
+ // TODO: sreek - Change num_pollers and num_threads_ to atomics
+ grpc::mutex mu_;
+
+ bool shutdown_;
+ grpc::condition_variable shutdown_cv_;
+
+ // Number of threads doing polling
+ int num_pollers_;
+
+ // The minimum and maximum number of threads that should be doing polling
+ int min_pollers_;
+ int max_pollers_;
+
+ // The total number of threads (includes threads includes the threads that are
+ // currently polling i.e num_pollers_)
+ int num_threads_;
+
+ grpc::mutex list_mu_;
+ std::list<GrpcRpcManagerThread*> completed_threads_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 2980b16c56..59c40dedaf 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -35,6 +35,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -61,6 +62,7 @@ ServerBuilder::ServerBuilder()
auto& factory = *it;
plugins_.emplace_back(factory());
}
+
// all compression algorithms enabled by default.
enabled_compression_algorithms_bitset_ =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
@@ -68,6 +70,16 @@ ServerBuilder::ServerBuilder()
sizeof(maybe_default_compression_level_));
memset(&maybe_default_compression_algorithm_, 0,
sizeof(maybe_default_compression_algorithm_));
+
+ // Sync server setting defaults
+ sync_server_settings_.min_pollers = 1;
+ sync_server_settings_.max_pollers = INT_MAX;
+
+ int num_cpus = gpr_cpu_num_cores();
+ num_cpus = GPR_MAX(num_cpus, 4);
+ sync_server_settings_.num_cqs = num_cpus;
+
+ sync_server_settings_.cq_timeout_msec = 1000;
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
@@ -94,7 +106,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
+ (void*)service);
} else {
generic_service_ = service;
}
@@ -130,6 +142,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
return *this;
}
+void ServerBuilder::SetSyncServerSettings(SyncServerSettings settings) {
+ sync_server_settings_ = settings; // copy the settings
+}
+
ServerBuilder& ServerBuilder::AddListeningPort(
const grpc::string& addr, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@@ -139,35 +155,24 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<ThreadPoolInterface> thread_pool;
- bool has_sync_methods = false;
- for (auto it = services_.begin(); it != services_.end(); ++it) {
- if ((*it)->service->has_synchronous_methods()) {
- if (!thread_pool) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- break;
- }
- }
- }
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
- if (!thread_pool && (*plugin)->has_sync_methods()) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- }
(*plugin)->UpdateChannelArguments(&args);
}
+
if (max_receive_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
+
if (max_send_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
+
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
enabled_compression_algorithms_bitset_);
if (maybe_default_compression_level_.is_set) {
@@ -178,27 +183,84 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
- std::unique_ptr<Server> server(new Server(thread_pool.release(), true,
- max_receive_message_size_, &args));
+
+ // == Determine if the server has any syncrhonous methods ==
+ bool has_sync_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+
+ if (!has_sync_methods) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin)->has_sync_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+ }
+
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(
+ new std::vector<std::unique_ptr<ServerCompletionQueue>>());
+
+ if (has_sync_methods) {
+ // This is a Sync server
+ gpr_log(GPR_INFO,
+ "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+ "%d, CQ timeout (msec): %d",
+ sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec);
+
+ // Create completion queues to listen to incoming rpc requests
+ for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
+ sync_server_cqs->emplace_back(new ServerCompletionQueue());
+ }
+ }
+
+ std::unique_ptr<Server> server(new Server(
+ sync_server_cqs, max_receive_message_size_, &args,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec));
+
ServerInitializer* initializer = server->initializer();
- // If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
- int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
-
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- // A completion queue that is not polled frequently (by calling Next() or
- // AsyncNext()) is not safe to use for listening to incoming channels.
- // Register all such completion queues as non-listening completion queues
- // with the GRPC core library.
- if ((*cq)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
+
+ // All sync cqs (if any) are frequently polled by the GrpcRpcManager
+ int num_frequently_polled_cqs = sync_server_cqs->size();
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
- (*cq)->cq(), nullptr);
+ (*it)->cq(), nullptr);
}
}
@@ -214,9 +276,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->InitServer(initializer);
}
+
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -229,6 +293,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
}
}
+
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
@@ -236,13 +301,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r;
}
}
+
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
}
+
return server;
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 3f89275370..36bc61fdf1 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
+// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -126,6 +127,11 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
}
};
+class ShutdownTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) { return false; }
+};
+
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -147,6 +153,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
+ // TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
@@ -158,6 +165,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ // TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
@@ -177,6 +185,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
GPR_UNREACHABLE_CODE(return false);
}
+ // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
+ // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -184,6 +194,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
+ void ResetRequest() { in_flight_ = false; }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -275,53 +287,168 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
+class Server::SyncRequestManager : public GrpcRpcManager {
+ public:
+ SyncRequestManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers, int cq_timeout_msec)
+ : GrpcRpcManager(min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ cq_timeout_msec_(cq_timeout_msec),
+ global_callbacks_(global_callbacks) {}
+
+ WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+ *tag = nullptr;
+ gpr_timespec deadline =
+ gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+
+ if (!sync_req) {
+ // No tag. Nothing to work on
+ // TODO (sreek) - Log a warning here since this is an unlikely case
+ return;
+ }
+
+ if (ok) {
+ SyncRequest::CallData cd(server_, sync_req);
+ {
+ sync_req->SetupRequest();
+ if (!IsShutdown()) {
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ } else {
+ sync_req->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ } else {
+ sync_req->ResetRequest();
+ // ok is false. For some reason, the tag was returned but event was not
+ // successful. In this case, request again unless we are shutting down
+ if (!IsShutdown()) {
+ // TODO (sreek) Remove this
+ // sync_req->Request(server_->c_server(), server_cq_->cq());
+ }
+ }
+ }
+
+ void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ sync_methods_.emplace_back(method, tag);
+ }
+
+ void AddUnknownSyncMethod() {
+ // TODO (sreek) - Check if !sync_methods_.empty() is really needed here
+ if (!sync_methods_.empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void ShutdownAndDrainCompletionQueue() {
+ server_cq_->Shutdown();
+
+ // Drain any pending items from the queue
+ void* tag;
+ bool ok;
+ while (server_cq_->Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+ }
+
+ void Start() {
+ if (!sync_methods_.empty()) {
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->SetupRequest();
+ m->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GrpcRpcManager::Initialize();
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ int cq_timeout_msec_;
+ std::vector<SyncRequest> sync_methods_;
+ std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args)
+Server::Server(
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int max_receive_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers, int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
+ sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
- num_running_cb_(0),
- sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
- thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned),
server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
+
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ sync_req_mgrs_.emplace_back(
+ new SyncRequestManager(this, (*it).get(), global_callbacks_,
+ min_pollers, max_pollers, sync_cq_timeout_msec));
+ }
+
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
+
server_ = grpc_server_create(&channel_args, nullptr);
- if (thread_pool_ == nullptr) {
- grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
- nullptr);
- } else {
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
- }
}
Server::~Server() {
{
+ // TODO (sreek) Check if we can just call Shutdown() even in case where
+ // started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else if (!started_) {
- cq_.Shutdown();
+ // Shutdown the completion queues
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
}
}
+
+ // TODO(sreek) Do thisfor all cqs ?
+ /*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+ */
grpc_server_destroy(server_);
- if (thread_pool_owned_) {
- delete thread_pool_;
- }
- delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -352,6 +479,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@@ -370,7 +498,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
- sync_methods_->emplace_back(method, tag);
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, tag);
+ }
}
method_name = method->name();
}
@@ -406,20 +536,23 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
}
+
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
}
+
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
+ /*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -427,24 +560,76 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
m->Request(server_, cq_.cq());
}
- ScheduleCallback();
+ GrpcRpcManager::Initialize();
}
+ */
return true;
}
+/* TODO (sreek) check if started_ and shutdown_ are needed anymore */
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
+
+ /// The completion queue to use for server shutdown completion notification
+ CompletionQueue shutdown_cq;
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+
+ // Shutdown all RpcManagers. This will try to gracefully stop all the
+ // threads in the RpcManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownRpcManager();
+ }
+
+ shutdown_cq.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Wait for threads in all RpcManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
+
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+
+ /*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+ */
+
+ // TODO (sreek) Delete this
+ /*
+ GrpcRpcManager::ShutdownRpcManager();
+ GrpcRpcManager::Wait();
+ */
+
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
+ //
+ /*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@@ -456,11 +641,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
}
lock.lock();
+ */
+ /* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
+ /*
+ while (num_running_cb_ != 0) {
+ callback_cv_.wait(lock);
+ }
+ */
shutdown_notified_ = true;
shutdown_cv_.notify_all();
@@ -585,46 +774,86 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
+// TODO: sreek - Remove this function
void Server::ScheduleCallback() {
+ GPR_ASSERT(false);
+ /*
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->Add(std::bind(&Server::RunRpc, this));
+ */
}
+// TODO: sreek - Remove this function
void Server::RunRpc() {
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
+ GPR_ASSERT(false);
+ /*
+ // Wait for one more incoming rpc.
+ bool ok;
+ GPR_TIMER_SCOPE("Server::RunRpc", 0);
+ auto* mrd = SyncRequest::Wait(&cq_, &ok);
+ if (mrd) {
+ ScheduleCallback();
+ if (ok) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+ }
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_running_cb_--;
+ if (shutdown_) {
+ callback_cv_.notify_all();
}
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
}
+ */
+}
+
+/* TODO (sreek) Move this to SyncRequestManager */
+/*
+void Server::PollForWork(bool& is_work_found, void** tag) {
+ is_work_found = true;
+ *tag = nullptr;
+ auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
+ if (is_work_found) {
+ *tag = mrd;
}
+}
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
+
+void Server::DoWork(void* tag) {
+ auto* mrd = static_cast<SyncRequest*>(tag);
+ if (mrd) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
}
}
+*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }