aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc190
1 files changed, 167 insertions, 23 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index a436ee43e9..28b874d9fb 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -275,15 +275,99 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
+class Server::SyncRequestManager : public GrpcRpcManager {
+ public:
+ SyncRequestManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers)
+ : GrpcRpcManager(min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ global_callbacks_(global_callbacks) {}
+
+ static const int kRpcPollingTimeoutMsec = 500;
+
+ WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+ *tag = nullptr;
+ gpr_timespec deadline =
+ gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+ if (ok && sync_req) {
+ SyncRequest::CallData cd(server_, sync_req);
+ {
+ sync_req->SetupRequest();
+ if (!IsShutdown()) {
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ } else {
+ sync_req->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+
+ // TODO (sreek): If ok == false, log an error
+ }
+
+ void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ sync_methods_.emplace_back(method, tag);
+ }
+
+ void AddUnknownSyncMethod() {
+ // TODO (sreek) - Check if !sync_methods_.empty() is really needed here
+ if (!sync_methods_.empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void Start() {
+ if (!sync_methods_.empty()) {
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->SetupRequest();
+ m->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GrpcRpcManager::Initialize();
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ std::vector<SyncRequest> sync_methods_;
+ std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(bool has_sync_methods, int max_message_size,
- ChannelArguments* args)
- : GrpcRpcManager(3, 5, 8),
- max_message_size_(max_message_size),
+Server::Server(
+ std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
+ int max_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers)
+ : max_message_size_(max_message_size),
+ sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
- sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
server_initializer_(new ServerInitializer(this)) {
@@ -291,16 +375,17 @@ Server::Server(bool has_sync_methods, int max_message_size,
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
+
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ sync_req_mgrs_.emplace_back(new SyncRequestManager(
+ this, &(*it), global_callbacks_, min_pollers, max_pollers));
+ }
+
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
- server_ = grpc_server_create(&channel_args, nullptr);
- if (!has_sync_methods) {
- grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
- nullptr);
- } else {
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
- }
+ server_ = grpc_server_create(&channel_args, nullptr);
}
Server::~Server() {
@@ -310,15 +395,20 @@ Server::~Server() {
lock.unlock();
Shutdown();
} else if (!started_) {
+ // TODO (sreek): Shutdown all cqs
+ /*
cq_.Shutdown();
+ */
}
}
+ // TODO(sreek) Do thisfor all cqs ?
+ /*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+ */
grpc_server_destroy(server_);
- delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -329,8 +419,6 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
grpc_server* Server::c_server() { return server_; }
-CompletionQueue* Server::completion_queue() { return &cq_; }
-
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -351,6 +439,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@@ -369,7 +458,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
- sync_methods_->emplace_back(method, tag);
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, tag);
+ }
}
method_name = method->name();
}
@@ -405,13 +496,8 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
}
for (size_t i = 0; i < num_cqs; i++) {
@@ -421,6 +507,12 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ /* TODO (Sreek) - Do this for all cqs */
+ /*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -430,26 +522,73 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GrpcRpcManager::Initialize();
}
+ */
return true;
}
+// TODO (sreek) - Reimplement this
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
+
+ int shutdown_tag = 0; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
+
+ // Shutdown all RpcManagers. This will try to gracefully stop all the
+ // threads in the RpcManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownRpcManager();
+ }
+
+ shutdown_cq_.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq_.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace-period for
+ // a clean shutdown. We should force a shutdown now by cancelling all
+ // inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Wait for threads in all RpcManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ }
+
+ // Shutdown the completion queues
+ // TODO (sreek) Move this into SyncRequestManager
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ (*it).Shutdown();
+ }
+
+ /*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+ */
+ // TODO (sreek) Delete this
+ /*
GrpcRpcManager::ShutdownRpcManager();
GrpcRpcManager::Wait();
+ */
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
+ //
+ /*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@@ -461,6 +600,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
}
lock.lock();
+ */
/* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
@@ -642,6 +782,8 @@ void Server::RunRpc() {
*/
}
+/* TODO (sreek) Move this to SyncRequestManager */
+/*
void Server::PollForWork(bool& is_work_found, void** tag) {
is_work_found = true;
*tag = nullptr;
@@ -651,6 +793,7 @@ void Server::PollForWork(bool& is_work_found, void** tag) {
}
}
+
void Server::DoWork(void* tag) {
auto* mrd = static_cast<SyncRequest*>(tag);
if (mrd) {
@@ -669,6 +812,7 @@ void Server::DoWork(void* tag) {
cd.Run(global_callbacks_);
}
}
+*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }