aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/secure_server_credentials.cc9
-rw-r--r--src/cpp/server/server.cc16
-rw-r--r--src/cpp/server/server_builder.cc18
-rw-r--r--src/cpp/server/server_context.cc11
-rw-r--r--src/cpp/server/thread_pool.cc52
-rw-r--r--src/cpp/server/thread_pool.h14
6 files changed, 66 insertions, 54 deletions
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 88f7a9b1a9..49d69a3fb9 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -59,9 +59,12 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
std::shared_ptr<ServerCredentials> SslServerCredentials(
const SslServerCredentialsOptions& options) {
std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
- for (const auto& key_cert_pair : options.pem_key_cert_pairs) {
- pem_key_cert_pairs.push_back(
- {key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()});
+ for (auto key_cert_pair = options.pem_key_cert_pairs.begin();
+ key_cert_pair != options.pem_key_cert_pairs.end();
+ key_cert_pair++) {
+ grpc_ssl_pem_key_cert_pair p = {key_cert_pair->private_key.c_str(),
+ key_cert_pair->cert_chain.c_str()};
+ pem_key_cert_pairs.push_back(p);
}
grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 5a4ca6915a..046133c5eb 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -107,6 +107,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
+ ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
mrd->in_flight_ = false;
mrd->request_metadata_.count = 0;
@@ -182,7 +183,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
Server::~Server() {
{
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
@@ -247,8 +248,8 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_.empty()) {
- for (auto& m : sync_methods_) {
- m.Request(server_);
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->Request(server_);
}
ScheduleCallback();
@@ -258,7 +259,7 @@ bool Server::Start() {
}
void Server::Shutdown() {
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown(server_);
@@ -272,7 +273,7 @@ void Server::Shutdown() {
}
void Server::Wait() {
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
while (num_running_cb_ != 0) {
callback_cv_.wait(lock);
}
@@ -364,6 +365,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
}
}
ctx->call_ = call_;
+ ctx->cq_ = cq_;
Call call(call_, server_, cq_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
@@ -403,7 +405,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context,
void Server::ScheduleCallback() {
{
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
@@ -424,7 +426,7 @@ void Server::RunRpc() {
}
{
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_--;
if (shutdown_) {
callback_cv_.notify_all();
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 58bf9d937f..c5e115f396 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -86,24 +86,26 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_owned = true;
}
std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned));
- for (auto* service : services_) {
- if (!server->RegisterService(service)) {
+ for (auto service = services_.begin(); service != services_.end();
+ service++) {
+ if (!server->RegisterService(*service)) {
return nullptr;
}
}
- for (auto* service : async_services_) {
- if (!server->RegisterAsyncService(service)) {
+ for (auto service = async_services_.begin();
+ service != async_services_.end(); service++) {
+ if (!server->RegisterAsyncService(*service)) {
return nullptr;
}
}
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}
- for (auto& port : ports_) {
- int r = server->AddListeningPort(port.addr, port.creds.get());
+ for (auto port = ports_.begin(); port != ports_.end(); port++) {
+ int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
- if (port.selected_port != nullptr) {
- *port.selected_port = r;
+ if (port->selected_port != nullptr) {
+ *port->selected_port = r;
}
}
if (!server->Start()) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index bb3c2d1405..ffd6d30d5d 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -33,9 +33,8 @@
#include <grpc++/server_context.h>
-#include <mutex>
-
#include <grpc++/impl/call.h>
+#include <grpc++/impl/sync.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/cpp/util/time.h"
@@ -57,14 +56,14 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
void Unref();
private:
- std::mutex mu_;
+ grpc::mutex mu_;
int refs_;
bool finalized_;
bool cancelled_;
};
void ServerContext::CompletionOp::Unref() {
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
delete this;
@@ -73,13 +72,13 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
- std::lock_guard<std::mutex> g(mu_);
+ grpc::lock_guard<grpc::mutex> g(mu_);
return finalized_ ? cancelled_ : false;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
- std::unique_lock<std::mutex> lock(mu_);
+ grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
if (!*status) cancelled_ = true;
if (--refs_ == 0) {
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc
index d3013b806c..e8d0e89ed2 100644
--- a/src/cpp/server/thread_pool.cc
+++ b/src/cpp/server/thread_pool.cc
@@ -31,48 +31,52 @@
*
*/
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+
#include "src/cpp/server/thread_pool.h"
namespace grpc {
+void ThreadPool::ThreadFunc() {
+ for (;;) {
+ // Wait until work is available or we are shutting down.
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_ && callbacks_.empty()) {
+ cv_.wait(lock);
+ }
+ // Drain callbacks before considering shutdown to ensure all work
+ // gets completed.
+ if (!callbacks_.empty()) {
+ auto cb = callbacks_.front();
+ callbacks_.pop();
+ lock.unlock();
+ cb();
+ } else if (shutdown_) {
+ return;
+ }
+ }
+}
+
ThreadPool::ThreadPool(int num_threads) : shutdown_(false) {
for (int i = 0; i < num_threads; i++) {
- threads_.push_back(std::thread([this]() {
- for (;;) {
- // Wait until work is available or we are shutting down.
- auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); };
- std::unique_lock<std::mutex> lock(mu_);
- if (!have_work()) {
- cv_.wait(lock, have_work);
- }
- // Drain callbacks before considering shutdown to ensure all work
- // gets completed.
- if (!callbacks_.empty()) {
- auto cb = callbacks_.front();
- callbacks_.pop();
- lock.unlock();
- cb();
- } else if (shutdown_) {
- return;
- }
- }
- }));
+ threads_.push_back(grpc::thread(&ThreadPool::ThreadFunc, this));
}
}
ThreadPool::~ThreadPool() {
{
- std::lock_guard<std::mutex> lock(mu_);
+ grpc::lock_guard<grpc::mutex> lock(mu_);
shutdown_ = true;
cv_.notify_all();
}
- for (auto& t : threads_) {
- t.join();
+ for (auto t = threads_.begin(); t != threads_.end(); t++) {
+ t->join();
}
}
void ThreadPool::ScheduleCallback(const std::function<void()>& callback) {
- std::lock_guard<std::mutex> lock(mu_);
+ grpc::lock_guard<grpc::mutex> lock(mu_);
callbacks_.push(callback);
cv_.notify_one();
}
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index 6225d82a0b..0f24d6e9b3 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -35,11 +35,11 @@
#define GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
#include <grpc++/config.h>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
#include <grpc++/thread_pool_interface.h>
-#include <condition_variable>
-#include <thread>
-#include <mutex>
#include <queue>
#include <vector>
@@ -53,11 +53,13 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE;
private:
- std::mutex mu_;
- std::condition_variable cv_;
+ grpc::mutex mu_;
+ grpc::condition_variable cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
- std::vector<std::thread> threads_;
+ std::vector<grpc::thread> threads_;
+
+ void ThreadFunc();
};
} // namespace grpc