aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/create_default_thread_pool.cc6
-rw-r--r--src/cpp/server/dynamic_thread_pool.cc24
-rw-r--r--src/cpp/server/secure_server_credentials.cc4
-rw-r--r--src/cpp/server/server.cc105
-rw-r--r--src/cpp/server/server_builder.cc21
-rw-r--r--src/cpp/server/server_context.cc8
6 files changed, 126 insertions, 42 deletions
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index 81c84474d8..9f59d254f1 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -39,9 +39,9 @@
namespace grpc {
ThreadPoolInterface* CreateDefaultThreadPool() {
- int cores = gpr_cpu_num_cores();
- if (!cores) cores = 4;
- return new DynamicThreadPool(cores);
+ int cores = gpr_cpu_num_cores();
+ if (!cores) cores = 4;
+ return new DynamicThreadPool(cores);
}
} // namespace grpc
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
index f58d0420df..b475f43b1d 100644
--- a/src/cpp/server/dynamic_thread_pool.cc
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -36,10 +36,10 @@
#include <grpc++/dynamic_thread_pool.h>
namespace grpc {
-DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool):
- pool_(pool),
- thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)) {
-}
+DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
+ : pool_(pool),
+ thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
+ this)) {}
DynamicThreadPool::DynamicThread::~DynamicThread() {
thd_->join();
thd_.reset();
@@ -57,7 +57,7 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->shutdown_cv_.notify_one();
}
}
-
+
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
@@ -65,7 +65,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
- break;
+ break;
}
threads_waiting_++;
cv_.wait(lock);
@@ -84,9 +84,11 @@ void DynamicThreadPool::ThreadFunc() {
}
}
-DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
- shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0),
- threads_waiting_(0) {
+DynamicThreadPool::DynamicThreadPool(int reserve_threads)
+ : shutdown_(false),
+ reserve_threads_(reserve_threads),
+ nthreads_(0),
+ threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
grpc::lock_guard<grpc::mutex> lock(mu_);
nthreads_++;
@@ -96,10 +98,10 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
- delete *t;
+ delete *t;
}
}
-
+
DynamicThreadPool::~DynamicThreadPool() {
grpc::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true;
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 32c45e2280..f203cf7f49 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -35,8 +35,8 @@
namespace grpc {
-int SecureServerCredentials::AddPortToServer(
- const grpc::string& addr, grpc_server* server) {
+int SecureServerCredentials::AddPortToServer(const grpc::string& addr,
+ grpc_server* server) {
return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_);
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 6e576ab8b3..d8ea375636 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
+ call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
+ ~SyncRequest() {
+ if (call_details_) {
+ delete call_details_;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -84,7 +90,27 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+ static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
+ gpr_timespec deadline) {
+ void* tag = nullptr;
+ *ok = false;
+ switch (cq->AsyncNext(&tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ *req = nullptr;
+ return true;
+ case CompletionQueue::SHUTDOWN:
+ *req = nullptr;
+ return false;
+ case CompletionQueue::GOT_EVENT:
+ *req = static_cast<SyncRequest*>(tag);
+ GPR_ASSERT((*req)->in_flight_);
+ return true;
+ }
+ gpr_log(GPR_ERROR, "Should never reach here");
+ abort();
+ }
+
+ void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -94,17 +120,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
+ if (tag_) {
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this));
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq, this));
+ }
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
+ if (call_details_) {
+ deadline_ = call_details_->deadline;
+ grpc_call_details_destroy(call_details_);
+ grpc_call_details_init(call_details_);
+ }
return true;
}
@@ -157,6 +198,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
+ grpc_call_details* call_details_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_;
@@ -176,9 +218,9 @@ static grpc_server* CreateServer(
args[1].value.integer = compression_options.enabled_algorithms_bitset;
grpc_channel_args channel_args = {2, args};
- return grpc_server_create(&channel_args);
+ return grpc_server_create(&channel_args, NULL);
} else {
- return grpc_server_create(nullptr);
+ return grpc_server_create(nullptr, nullptr);
}
}
@@ -190,10 +232,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
+ has_generic_service_(false),
server_(CreateServer(max_message_size, compression_options)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
- grpc_server_register_completion_queue(server_, cq_.cq());
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
Server::~Server() {
@@ -214,23 +257,23 @@ Server::~Server() {
delete sync_methods_;
}
-bool Server::RegisterService(const grpc::string *host, RpcService* service) {
+bool Server::RegisterService(const grpc::string* host, RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag = grpc_server_register_method(
- server_, method->name(), host ? host->c_str() : nullptr);
+ void* tag = grpc_server_register_method(server_, method->name(),
+ host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- SyncRequest request(method, tag);
- sync_methods_->emplace_back(request);
+ sync_methods_->emplace_back(method, tag);
}
return true;
}
-bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) {
+bool Server::RegisterAsyncService(const grpc::string* host,
+ AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
@@ -252,6 +295,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
+ has_generic_service_ = true;
}
int Server::AddListeningPort(const grpc::string& addr,
@@ -265,6 +309,14 @@ bool Server::Start() {
started_ = true;
grpc_server_start(server_);
+ if (!has_generic_service_) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted here
+ // by gcc-4.4 because it can't match the anonymous nullptr with a proper
+ // constructor implicitly. Construct the object and use push_back.
+ sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -278,12 +330,27 @@ bool Server::Start() {
return true;
}
-void Server::Shutdown() {
+void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
+ // Spin, eating requests until the completion queue is completely shutdown.
+ // If the deadline expires then cancel anything that's pending and keep
+ // spinning forever until the work is actually drained.
+ // Since nothing else needs to touch state guarded by mu_, holding it
+ // through this loop is fine.
+ SyncRequest* request;
+ bool ok;
+ while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
+ if (request == NULL) { // deadline expired
+ grpc_server_cancel_all_calls(server_);
+ deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ } else if (ok) {
+ SyncRequest::CallData call_data(this, request);
+ }
+ }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {
@@ -304,8 +371,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops));
+ auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 425b052128..57a734c334 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -38,6 +38,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
#include <grpc++/thread_pool_interface.h>
+#include <grpc++/fixed_size_thread_pool.h>
namespace grpc {
@@ -63,9 +64,10 @@ ServerBuilder& ServerBuilder::RegisterAsyncService(
return *this;
}
-ServerBuilder& ServerBuilder::RegisterService(
- const grpc::string& addr, SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
+ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr,
+ SynchronousService* service) {
+ services_.emplace_back(
+ new NamedService<RpcService>(addr, service->service()));
return *this;
}
@@ -123,11 +125,18 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
+ // Async services only, create a thread pool to handle requests to unknown
+ // services.
+ if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
+ thread_pool_ = new FixedSizeThreadPool(1);
+ thread_pool_owned = true;
+ }
std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned,
max_message_size_,
compression_options_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq());
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
}
for (auto service = services_.begin(); service != services_.end();
service++) {
@@ -135,8 +144,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin();
- service != async_services_.end(); service++) {
+ for (auto service = async_services_.begin(); service != async_services_.end();
+ service++) {
if (!server->RegisterAsyncService((*service)->host.get(),
(*service)->service)) {
return nullptr;
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 04373397f9..03461ddda5 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -50,7 +50,12 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}
+ CompletionOp()
+ : has_tag_(false),
+ tag_(nullptr),
+ refs_(2),
+ finalized_(false),
+ cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@@ -91,6 +96,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
+ ops->reserved = NULL;
*nops = 1;
}