diff options
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/secure_server_credentials.cc | 9 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 16 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 18 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 11 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.cc | 52 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.h | 14 |
6 files changed, 66 insertions, 54 deletions
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index 88f7a9b1a9..49d69a3fb9 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -59,9 +59,12 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials { std::shared_ptr<ServerCredentials> SslServerCredentials( const SslServerCredentialsOptions& options) { std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; - for (const auto& key_cert_pair : options.pem_key_cert_pairs) { - pem_key_cert_pairs.push_back( - {key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()}); + for (auto key_cert_pair = options.pem_key_cert_pairs.begin(); + key_cert_pair != options.pem_key_cert_pairs.end(); + key_cert_pair++) { + grpc_ssl_pem_key_cert_pair p = {key_cert_pair->private_key.c_str(), + key_cert_pair->cert_chain.c_str()}; + pem_key_cert_pairs.push_back(p); } grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create( options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 5a4ca6915a..046133c5eb 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -107,6 +107,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { request_payload_(mrd->request_payload_), method_(mrd->method_) { ctx_.call_ = mrd->call_; + ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); mrd->in_flight_ = false; mrd->request_metadata_.count = 0; @@ -182,7 +183,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned) Server::~Server() { { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); Shutdown(); @@ -247,8 +248,8 @@ bool Server::Start() { // Start processing rpcs. if (!sync_methods_.empty()) { - for (auto& m : sync_methods_) { - m.Request(server_); + for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { + m->Request(server_); } ScheduleCallback(); @@ -258,7 +259,7 @@ bool Server::Start() { } void Server::Shutdown() { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown(server_); @@ -272,7 +273,7 @@ void Server::Shutdown() { } void Server::Wait() { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); while (num_running_cb_ != 0) { callback_cv_.wait(lock); } @@ -364,6 +365,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } } ctx->call_ = call_; + ctx->cq_ = cq_; Call call(call_, server_, cq_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); @@ -403,7 +405,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context, void Server::ScheduleCallback() { { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); num_running_cb_++; } thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this)); @@ -424,7 +426,7 @@ void Server::RunRpc() { } { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); num_running_cb_--; if (shutdown_) { callback_cv_.notify_all(); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 58bf9d937f..c5e115f396 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -86,24 +86,26 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_owned = true; } std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned)); - for (auto* service : services_) { - if (!server->RegisterService(service)) { + for (auto service = services_.begin(); service != services_.end(); + service++) { + if (!server->RegisterService(*service)) { return nullptr; } } - for (auto* service : async_services_) { - if (!server->RegisterAsyncService(service)) { + for (auto service = async_services_.begin(); + service != async_services_.end(); service++) { + if (!server->RegisterAsyncService(*service)) { return nullptr; } } if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } - for (auto& port : ports_) { - int r = server->AddListeningPort(port.addr, port.creds.get()); + for (auto port = ports_.begin(); port != ports_.end(); port++) { + int r = server->AddListeningPort(port->addr, port->creds.get()); if (!r) return nullptr; - if (port.selected_port != nullptr) { - *port.selected_port = r; + if (port->selected_port != nullptr) { + *port->selected_port = r; } } if (!server->Start()) { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index bb3c2d1405..ffd6d30d5d 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -33,9 +33,8 @@ #include <grpc++/server_context.h> -#include <mutex> - #include <grpc++/impl/call.h> +#include <grpc++/impl/sync.h> #include <grpc/grpc.h> #include <grpc/support/log.h> #include "src/cpp/util/time.h" @@ -57,14 +56,14 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { void Unref(); private: - std::mutex mu_; + grpc::mutex mu_; int refs_; bool finalized_; bool cancelled_; }; void ServerContext::CompletionOp::Unref() { - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); delete this; @@ -73,13 +72,13 @@ void ServerContext::CompletionOp::Unref() { bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { cq->TryPluck(this); - std::lock_guard<std::mutex> g(mu_); + grpc::lock_guard<grpc::mutex> g(mu_); return finalized_ ? cancelled_ : false; } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status)); - std::unique_lock<std::mutex> lock(mu_); + grpc::unique_lock<grpc::mutex> lock(mu_); finalized_ = true; if (!*status) cancelled_ = true; if (--refs_ == 0) { diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc index d3013b806c..e8d0e89ed2 100644 --- a/src/cpp/server/thread_pool.cc +++ b/src/cpp/server/thread_pool.cc @@ -31,48 +31,52 @@ * */ +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> + #include "src/cpp/server/thread_pool.h" namespace grpc { +void ThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && callbacks_.empty()) { + cv_.wait(lock); + } + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } +} + ThreadPool::ThreadPool(int num_threads) : shutdown_(false) { for (int i = 0; i < num_threads; i++) { - threads_.push_back(std::thread([this]() { - for (;;) { - // Wait until work is available or we are shutting down. - auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); }; - std::unique_lock<std::mutex> lock(mu_); - if (!have_work()) { - cv_.wait(lock, have_work); - } - // Drain callbacks before considering shutdown to ensure all work - // gets completed. - if (!callbacks_.empty()) { - auto cb = callbacks_.front(); - callbacks_.pop(); - lock.unlock(); - cb(); - } else if (shutdown_) { - return; - } - } - })); + threads_.push_back(grpc::thread(&ThreadPool::ThreadFunc, this)); } } ThreadPool::~ThreadPool() { { - std::lock_guard<std::mutex> lock(mu_); + grpc::lock_guard<grpc::mutex> lock(mu_); shutdown_ = true; cv_.notify_all(); } - for (auto& t : threads_) { - t.join(); + for (auto t = threads_.begin(); t != threads_.end(); t++) { + t->join(); } } void ThreadPool::ScheduleCallback(const std::function<void()>& callback) { - std::lock_guard<std::mutex> lock(mu_); + grpc::lock_guard<grpc::mutex> lock(mu_); callbacks_.push(callback); cv_.notify_one(); } diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h index 6225d82a0b..0f24d6e9b3 100644 --- a/src/cpp/server/thread_pool.h +++ b/src/cpp/server/thread_pool.h @@ -35,11 +35,11 @@ #define GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H #include <grpc++/config.h> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> #include <grpc++/thread_pool_interface.h> -#include <condition_variable> -#include <thread> -#include <mutex> #include <queue> #include <vector> @@ -53,11 +53,13 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface { void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE; private: - std::mutex mu_; - std::condition_variable cv_; + grpc::mutex mu_; + grpc::condition_variable cv_; bool shutdown_; std::queue<std::function<void()>> callbacks_; - std::vector<std::thread> threads_; + std::vector<grpc::thread> threads_; + + void ThreadFunc(); }; } // namespace grpc |